diff options
Diffstat (limited to 'java/broker/src/main/java/org')
38 files changed, 1056 insertions, 1083 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java index b3c843ebaa..7216841a94 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java @@ -52,9 +52,9 @@ import org.apache.qpid.server.management.MBeanConstructor; import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.management.ManagedBroker; import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.transactionlog.TransactionLog; import org.apache.qpid.server.routing.RoutingTable; diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 2d0589c223..bb7331e5f7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -20,6 +20,14 @@ */ package org.apache.qpid.server; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; @@ -29,12 +37,12 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl; -import org.apache.qpid.server.configuration.Configurator; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.NoRouteException; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.IncomingMessage; import org.apache.qpid.server.queue.QueueEntry; @@ -45,19 +53,15 @@ import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.subscription.ClientDeliveryMethod; import org.apache.qpid.server.subscription.RecordDeliveryMethod; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.subscription.ClientDeliveryMethod; +import org.apache.qpid.server.subscription.RecordDeliveryMethod; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.txn.LocalTransactionalContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.transactionlog.TransactionLog; -import java.util.Collection; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.concurrent.atomic.AtomicBoolean; - public class AMQChannel { public static final int DEFAULT_PREFETCH = 5000; @@ -114,9 +118,6 @@ public class AMQChannel public AMQChannel(AMQProtocolSession session, int channelId, TransactionLog transactionLog) throws AMQException { - //Set values from configuration - Configurator.configure(this); - _session = session; _channelId = channelId; _storeContext = new StoreContext("Session: " + session.getClientIdentifier() + "; channel: " + channelId); diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java index f3b54034e7..780a17940e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -20,6 +20,12 @@ */ package org.apache.qpid.server; +import java.io.File; +import java.io.IOException; +import java.net.BindException; +import java.net.InetAddress; +import java.net.InetSocketAddress; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Option; @@ -27,36 +33,23 @@ import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.cli.PosixParser; -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.BasicConfigurator; import org.apache.log4j.Logger; import org.apache.log4j.xml.DOMConfigurator; import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.IoAcceptor; import org.apache.mina.common.FixedSizeByteBufferAllocator; +import org.apache.mina.common.IoAcceptor; import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; -import org.apache.qpid.AMQException; +import org.apache.mina.util.NewThreadExecutor; import org.apache.qpid.common.QpidProperties; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.pool.ReadWriteThreadModel; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; -import org.apache.qpid.server.management.JMXManagedObjectRegistry; +import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.protocol.AMQPFastProtocolHandler; import org.apache.qpid.server.protocol.AMQPProtocolProvider; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; -import org.apache.qpid.server.transport.ConnectorConfiguration; -import org.apache.qpid.url.URLSyntaxException; - -import java.io.File; -import java.io.IOException; -import java.net.BindException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.Collection; -import java.util.List; /** * Main entry point for AMQPD. @@ -200,13 +193,6 @@ public class Main _brokerLogger.error("Initialisation Error : " + e.getMessage()); shutdown(1); } - catch (ConfigurationException e) - { - System.out.println("Error configuring message broker: " + e); - _brokerLogger.error("Error configuring message broker: " + e); - e.printStackTrace(); - shutdown(1); - } catch (Throwable e) { System.out.println("Error initialising message broker: " + e); @@ -223,7 +209,7 @@ public class Main System.exit(status); } - protected void startup() throws InitException, ConfigurationException, Exception + protected void startup() throws Exception { final String QpidHome = System.getProperty(QPID_HOME); final File defaultConfigFile = new File(QpidHome, DEFAULT_CONFIG_FILE); @@ -259,40 +245,32 @@ public class Main } ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile); - - - updateManagementPort(config.getConfiguration(), commandLine.getOptionValue("m")); - - + ServerConfiguration serverConfig = config.getConfiguration(); + updateManagementPort(serverConfig, commandLine.getOptionValue("m")); ApplicationRegistry.initialise(config); - //fixme .. use QpidProperties.getVersionString when we have fixed the classpath issues // that are causing the broker build to pick up the wrong properties file and hence say // Starting Qpid Client _brokerLogger.info("Starting Qpid Broker " + QpidProperties.getReleaseVersion() + " build: " + QpidProperties.getBuildVersion()); - ConnectorConfiguration connectorConfig = - ApplicationRegistry.getInstance().getConfiguredObject(ConnectorConfiguration.class); - - ByteBuffer.setUseDirectBuffers(connectorConfig.enableDirectBuffers); + ByteBuffer.setUseDirectBuffers(serverConfig.getEnableDirectBuffers()); // the MINA default is currently to use the pooled allocator although this may change in future // once more testing of the performance of the simple allocator has been done - if (!connectorConfig.enablePooledAllocator) + if (!serverConfig.getEnablePooledAllocator()) { ByteBuffer.setAllocator(new FixedSizeByteBufferAllocator()); } - - if(connectorConfig.useBiasedWrites) + if(serverConfig.getUseBiasedWrites()) { System.setProperty("org.apache.qpid.use_write_biased_pool","true"); } - int port = connectorConfig.port; + int port = serverConfig.getPort(); String portStr = commandLine.getOptionValue("p"); if (portStr != null) @@ -306,29 +284,8 @@ public class Main throw new InitException("Invalid port: " + portStr, e); } } - - String VIRTUAL_HOSTS = "virtualhosts"; - - Object virtualHosts = ApplicationRegistry.getInstance().getConfiguration().getProperty(VIRTUAL_HOSTS); - - if (virtualHosts != null) - { - if (virtualHosts instanceof Collection) - { - int totalVHosts = ((Collection) virtualHosts).size(); - for (int vhost = 0; vhost < totalVHosts; vhost++) - { - setupVirtualHosts(configFile.getParent(), (String) ((List) virtualHosts).get(vhost)); - } - } - else - { - setupVirtualHosts(configFile.getParent(), (String) virtualHosts); - } - } - - bind(port, connectorConfig); - + + bind(port, serverConfig); } /** @@ -336,86 +293,59 @@ public class Main * @param configuration * @param managementPort The string from the command line */ - private void updateManagementPort(Configuration configuration, String managementPort) + private void updateManagementPort(ServerConfiguration configuration, String managementPort) { if (managementPort != null) { - int mport; - int defaultMPort = configuration.getInt(JMXManagedObjectRegistry.MANAGEMENT_PORT_CONFIG_PATH); try { - mport = Integer.parseInt(managementPort); - configuration.setProperty(JMXManagedObjectRegistry.MANAGEMENT_PORT_CONFIG_PATH, mport); + configuration.setJMXManagementPort(Integer.parseInt(managementPort)); } catch (NumberFormatException e) { - _logger.warn("Invalid management port: " + managementPort + " will use default:" + defaultMPort, e); + _logger.warn("Invalid management port: " + managementPort + " will use:" + configuration.getJMXManagementPort(), e); } } } - protected void setupVirtualHosts(String configFileParent, String configFilePath) - throws ConfigurationException, AMQException, URLSyntaxException - { - String configVar = "${conf}"; - - if (configFilePath.startsWith(configVar)) - { - configFilePath = configFileParent + configFilePath.substring(configVar.length()); - } - - if (configFilePath.indexOf(".xml") != -1) - { - VirtualHostConfiguration vHostConfig = new VirtualHostConfiguration(configFilePath); - vHostConfig.performBindings(); - } - else - { - // the virtualhosts value is a path. Search it for XML files. - - File virtualHostDir = new File(configFilePath); - - String[] fileNames = virtualHostDir.list(); - - for (int each = 0; each < fileNames.length; each++) - { - if (fileNames[each].endsWith(".xml")) - { - VirtualHostConfiguration vHostConfig = - new VirtualHostConfiguration(configFilePath + "/" + fileNames[each]); - vHostConfig.performBindings(); - } - } - } - } - - protected void bind(int port, ConnectorConfiguration connectorConfig) throws BindException + protected void bind(int port, ServerConfiguration config) throws BindException { String bindAddr = commandLine.getOptionValue("b"); if (bindAddr == null) { - bindAddr = connectorConfig.bindAddress; + bindAddr = config.getBind(); } try { - // IoAcceptor acceptor = new SocketAcceptor(connectorConfig.processors); - IoAcceptor acceptor = connectorConfig.createAcceptor(); + IoAcceptor acceptor; + + if (ApplicationRegistry.getInstance().getConfiguration().getQpidNIO()) + { + _logger.warn("Using Qpid Multithreaded IO Processing"); + acceptor = new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(config.getProcessors(), new NewThreadExecutor()); + } + else + { + _logger.warn("Using Mina IO Processing"); + acceptor = new org.apache.mina.transport.socket.nio.SocketAcceptor(config.getProcessors(), new NewThreadExecutor()); + } + SocketAcceptorConfig sconfig = (SocketAcceptorConfig) acceptor.getDefaultConfig(); SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig(); - sc.setReceiveBufferSize(connectorConfig.socketReceiveBufferSize); - sc.setSendBufferSize(connectorConfig.socketWriteBuferSize); - sc.setTcpNoDelay(connectorConfig.tcpNoDelay); + sc.setReceiveBufferSize(config.getReceiveBufferSize()); + sc.setSendBufferSize(config.getWriteBufferSize()); + sc.setTcpNoDelay(config.getTcpNoDelay()); // if we do not use the executor pool threading model we get the default leader follower // implementation provided by MINA - if (connectorConfig.enableExecutorPool) + if (config.getEnableExecutorPool()) { sconfig.setThreadModel(ReadWriteThreadModel.getInstance()); } - if (!connectorConfig.enableSSL || !connectorConfig.sslOnly) + if (!config.getEnableSSL() || !config.getSSLOnly()) { AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler(); InetSocketAddress bindAddress; @@ -434,16 +364,16 @@ public class Main _brokerLogger.info("Qpid.AMQP listening on non-SSL address " + bindAddress); } - if (connectorConfig.enableSSL) + if (config.getEnableSSL()) { AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler(); try { - bind(acceptor, new InetSocketAddress(connectorConfig.sslPort), handler, sconfig); + bind(acceptor, new InetSocketAddress(config.getSSLPort()), handler, sconfig); //fixme qpid.AMQP should be using qpidproperties to get value - _brokerLogger.info("Qpid.AMQP listening on SSL port " + connectorConfig.sslPort); + _brokerLogger.info("Qpid.AMQP listening on SSL port " + config.getSSLPort()); } catch (IOException e) diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/Configurator.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/Configurator.java deleted file mode 100644 index 31c1b61a21..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/Configurator.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.configuration; - -import java.lang.reflect.Field; - -import org.apache.commons.configuration.Configuration; -import org.apache.log4j.Logger; -import org.apache.qpid.configuration.Configured; -import org.apache.qpid.configuration.PropertyException; -import org.apache.qpid.configuration.PropertyUtils; -import org.apache.qpid.server.registry.ApplicationRegistry; - -/** - * This class contains utilities for populating classes automatically from values pulled from configuration - * files. - */ -public class Configurator -{ - private static final Logger _logger = Logger.getLogger(Configurator.class); - - - /** - * Configure a given instance using the supplied configuration. Note that superclasses are <b>not</b> - * currently configured but this could easily be added if required. - * @param instance the instance to configure - * @param config the configuration to use to configure the object - */ - public static void configure(Object instance, Configuration config) - { - - for (Field f : instance.getClass().getDeclaredFields()) - { - Configured annotation = f.getAnnotation(Configured.class); - if (annotation != null) - { - setValueInField(f, instance, config, annotation); - } - } - } - - - - /** - * Configure a given instance using the application configuration. Note that superclasses are <b>not</b> - * currently configured but this could easily be added if required. - * @param instance the instance to configure - */ - public static void configure(Object instance) - { - configure(instance, ApplicationRegistry.getInstance().getConfiguration()); - } - - private static void setValueInField(Field f, Object instance, Configuration config, Configured annotation) - { - Class fieldClass = f.getType(); - String configPath = annotation.path(); - try - { - if (fieldClass == String.class) - { - String val = config.getString(configPath, annotation.defaultValue()); - val = PropertyUtils.replaceProperties(val); - f.set(instance, val); - } - else if (fieldClass == int.class) - { - int val = config.getInt(configPath, Integer.parseInt(annotation.defaultValue())); - f.setInt(instance, val); - } - else if (fieldClass == long.class) - { - long val = config.getLong(configPath, Long.parseLong(annotation.defaultValue())); - f.setLong(instance, val); - } - else if (fieldClass == double.class) - { - double val = config.getDouble(configPath, Double.parseDouble(annotation.defaultValue())); - f.setDouble(instance, val); - } - else if (fieldClass == boolean.class) - { - boolean val = config.getBoolean(configPath, Boolean.parseBoolean(annotation.defaultValue())); - f.setBoolean(instance, val); - } - else - { - _logger.error("Unsupported field type " + fieldClass + " for " + f + " IGNORING configured value"); - } - } - catch (PropertyException e) - { - _logger.error("Unable to expand property: " + e + " INGORING field " + f, e); - } - catch (IllegalAccessException e) - { - _logger.error("Unable to access field " + f + " IGNORING configured value"); - } - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/management/ManagementConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/ExchangeConfiguration.java index 042f626e8b..c7cf0c0892 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/management/ManagementConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/ExchangeConfiguration.java @@ -18,13 +18,41 @@ * under the License. * */ -package org.apache.qpid.server.management; +package org.apache.qpid.server.configuration; -import org.apache.qpid.configuration.Configured; +import org.apache.commons.configuration.Configuration; -public class ManagementConfiguration + +public class ExchangeConfiguration { - @Configured(path = "management.enabled", - defaultValue = "true") - public boolean enabled; + + private Configuration _config; + private String _name; + + public ExchangeConfiguration(String exchName, Configuration subset) + { + _name = exchName; + _config = subset; + } + + public String getName() + { + return _name; + } + + public String getType() + { + return _config.getString("type","direct"); + } + + public boolean getDurable() + { + return _config.getBoolean("durable", false); + } + + public boolean getAutoDelete() + { + return _config.getBoolean("autodelete",false); + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java new file mode 100644 index 0000000000..90d6caec99 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java @@ -0,0 +1,106 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.configuration; + +import java.util.List; + +import org.apache.commons.configuration.Configuration; + +public class QueueConfiguration +{ + + // FIXME AIDAN XXX -- deal with defaults + + private Configuration _config; + private String _name; + + public QueueConfiguration(String name, Configuration config) + { + _config = config; + _name = name; + } + + public boolean getDurable() + { + return _config.getBoolean("durable" ,false); + } + + public boolean getAutoDelete() + { + return _config.getBoolean("autodelete", false); + } + + public String getOwner() + { + return _config.getString("owner", null); + } + + public boolean getPriority() + { + return _config.getBoolean("priority", false); + } + + public int getPriorities() + { + return _config.getInt("priorities", -1); + } + + public String getExchange() + { + return _config.getString("exchange", null); + } + + public List getRoutingKeys() + { + return _config.getList("routingKey"); + } + + public String getName() + { + return _name; + } + + public long getMaximumMessageAge() + { + return _config.getLong("maximumMessageAge", 0); + } + + public long getMaximumQueueDepth() + { + return _config.getLong("maximumQueueDepth", 0); + } + + public long getMaximumMessageSize() + { + return _config.getLong("maximumMessageSize", 0); + } + + public long getMaximumMessageCount() + { + return _config.getLong("maximumMessageCount", 0); + } + + public long getMinimumAlertRepeatGap() + { + return _config.getLong("minimumAlertRepeatGap", 0); + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/SecurityConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/SecurityConfiguration.java new file mode 100644 index 0000000000..5d080f8df1 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/SecurityConfiguration.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ + +package org.apache.qpid.server.configuration; + +import org.apache.commons.configuration.Configuration; + +public class SecurityConfiguration +{ + + private Configuration _conf; + + public SecurityConfiguration(Configuration configuration) + { + _conf = configuration; + } + + public Configuration getConfiguration() + { + return _conf; + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java new file mode 100644 index 0000000000..225042e9ec --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.configuration; + +import java.io.File; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.commons.configuration.CompositeConfiguration; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.SystemConfiguration; +import org.apache.commons.configuration.XMLConfiguration; + +public class ServerConfiguration +{ + + private static Configuration _config; + + private static final int DEFAULT_FRAME_SIZE = 65536; + private static final int DEFAULT_BUFFER_READ_LIMIT_SIZE = 262144; + private static final int DEFAULT_BUFFER_WRITE_LIMIT_SIZE = 262144; + private static final int DEFAULT_PORT = 5672; + private static final int DEFAUL_SSL_PORT = 8672; + private static final long DEFAULT_HOUSEKEEPING_PERIOD = 30000L; + private static final int DEFAULT_JMXPORT = 8999; + + private long _housekeepingExpiredMessageCheckPeriod = DEFAULT_HOUSEKEEPING_PERIOD; + private static int _jmxPort = DEFAULT_JMXPORT; + + private Map<String, VirtualHostConfiguration> _virtualHosts = new HashMap<String, VirtualHostConfiguration>(); + private SecurityConfiguration _securityConfiguration = null; + + public ServerConfiguration(File configurationURL) throws ConfigurationException + { + this(config(configurationURL)); + } + + public ServerConfiguration(Configuration conf) throws ConfigurationException + { + _config = conf; + _jmxPort = _config.getInt("management.jmxport", 8999); + + _securityConfiguration = new SecurityConfiguration(conf.subset("security")); + + List vhosts = conf.getList("virtualhosts"); + Iterator i = vhosts.iterator(); + while (i.hasNext()) + { + Object thing = i.next(); + if (thing instanceof String) + { + XMLConfiguration vhostConfiguration = new XMLConfiguration((String) thing); + List hosts = vhostConfiguration.getList("virtualhost.name"); + for (int j = 0; j < hosts.size(); j++) + { + String name = (String) hosts.get(j); + CompositeConfiguration mungedConf = new CompositeConfiguration(); + mungedConf.addConfiguration(conf.subset("virtualhosts.virtualhost."+name)); + mungedConf.addConfiguration(vhostConfiguration.subset("virtualhost." + name)); + VirtualHostConfiguration vhostConfig = new VirtualHostConfiguration(name, mungedConf); + _virtualHosts.put(vhostConfig.getName(), vhostConfig); + } + } + } + } + + public static String[] objListToStringArray(List objList) + { + String[] networkStrings = new String[objList.size()]; + int i = 0; + for (Object network : objList) + { + networkStrings[i++] = (String) network; + } + return networkStrings; + } + + // Our configuration class needs to make the interpolate method + // public so it can be called below from the config method. + private static class MyConfiguration extends CompositeConfiguration + { + public String interpolate(String obj) + { + return super.interpolate(obj); + } + } + + private final static Configuration config(File url) throws ConfigurationException + { + // We have to override the interpolate methods so that + // interpolation takes place accross the entirety of the + // composite configuration. Without doing this each + // configuration object only interpolates variables defined + // inside itself. + final MyConfiguration conf = new MyConfiguration(); + conf.addConfiguration(new SystemConfiguration() + { + protected String interpolate(String o) + { + return conf.interpolate(o); + } + }); + conf.addConfiguration(new XMLConfiguration(url) + { + protected String interpolate(String o) + { + return conf.interpolate(o); + } + }); + return conf; + } + + public void setJMXManagementPort(int mport) + { + _jmxPort = mport; + } + + public int getJMXManagementPort() + { + return _jmxPort; + } + + public boolean getPlatformMbeanserver() + { + return _config.getBoolean("management.platform-mbeanserver", true); + } + + public String[] getVirtualHosts() + { + return _virtualHosts.keySet().toArray(new String[_virtualHosts.size()]); + } + + public String getPluginDirectory() + { + return _config.getString("plugin-directory"); + } + + public VirtualHostConfiguration getVirtualHostConfig(String name) + { + return _virtualHosts.get(name); + } + + public List<String> getPrincipalDatabaseNames() + { + return _config.getList("security.principal-databases.principal-database.name"); + } + + public List<String> getPrincipalDatabaseClass() + { + return _config.getList("security.principal-databases.principal-database.class"); + } + + public List<String> getPrincipalDatabaseAttributeNames(int index) + { + String name = "security.principal-databases.principal-database(" + index + ")." + "attributes.attribute.name"; + return _config.getList(name); + } + + public List<String> getPrincipalDatabaseAttributeValues(int index) + { + String name = "security.principal-databases.principal-database(" + index + ")." + "attributes.attribute.value"; + return _config.getList(name); + } + + public List<String> getManagementPrincipalDBs() + { + return _config.getList("security.jmx.principal-database"); + } + + public List<String> getManagementAccessList() + { + return _config.getList("security.jmx.access"); + } + + public int getFrameSize() + { + return _config.getInt("advanced.framesize", DEFAULT_FRAME_SIZE); + } + + public boolean getManagementSecurityEnabled() + { + return _config.getBoolean("management.security-enabled", false); + } + + public boolean getProtectIOEnabled() + { + return _config.getBoolean("broker.connector.protectio.enabled", false); + } + + public int getBufferReadLimit() + { + return _config.getInt("broker.connector.protectio.readBufferLimitSize", DEFAULT_BUFFER_READ_LIMIT_SIZE); + } + + public int getBufferWriteLimit() + { + return _config.getInt("broker.connector.protectio.writeBufferLimitSize", DEFAULT_BUFFER_WRITE_LIMIT_SIZE); + } + + public boolean getSynchedClocks() + { + return _config.getBoolean("advanced.synced-clocks", false); + } + + public boolean getMsgAuth() + { + return _config.getBoolean("security.msg-auth", false); + } + + public String getJMXPrincipalDatabase() + { + return _config.getString("security.jmx.principal-database"); + } + + public String getManagementKeyStorePath() + { + return _config.getString("management.ssl.keyStorePath", null); + } + + public boolean getManagementSSLEnabled() + { + return _config.getBoolean("management.ssl.enabled", true); + } + + public String getManagementKeyStorePassword() + { + return _config.getString("management.ssl.keyStorePassword"); + } + + public SecurityConfiguration getSecurityConfiguration() + { + return _securityConfiguration; + } + + public boolean getQueueAutoRegister() + { + return _config.getBoolean("queue.auto_register", true); + } + + public boolean getManagementEnabled() + { + return _config.getBoolean("management.enabled", true); + } + + public int getHeartBeatDelay() + { + return _config.getInt("heartbeat.delay", 5); + } + + public double getHeartBeatTimeout() + { + return _config.getDouble("heartbeat.timeoutFactor", 2.0); + } + + public int getDeliveryPoolSize() + { + return _config.getInt("delivery.poolsize", 0); + } + + public long getMaximumMessageAge() + { + return _config.getLong("maximumMessageAge", 0); + } + + public long getMaximumMessageCount() + { + return _config.getLong("maximumMessageCount", 0); + } + + public long getMaximumQueueDepth() + { + return _config.getLong("maximumQueueDepth", 0); + } + + public long getMaximumMessageSize() + { + return _config.getLong("maximumMessageSize", 0); + } + + public long getMinimumAlertRepeatGap() + { + return _config.getLong("minimumAlertRepeatGap", 0); + } + + public int getProcessors() + { + return _config.getInt("connector.processors", 4); + } + + public int getPort() + { + return _config.getInt("connector.port", DEFAULT_PORT); + } + + public String getBind() + { + return _config.getString("connector.bind", "wildcard"); + } + + public int getReceiveBufferSize() + { + return _config.getInt("connector.socketReceiveBuffer", 32767); + } + + public int getWriteBufferSize() + { + return _config.getInt("connector.socketWriteBuffer", 32767); + } + + public boolean getTcpNoDelay() + { + return _config.getBoolean("connector.tcpNoDelay", true); + } + + public boolean getEnableExecutorPool() + { + return _config.getBoolean("advanced.filterchain[@enableExecutorPool]", false); + } + + public boolean getEnablePooledAllocator() + { + return _config.getBoolean("advanced.enablePooledAllocator", false); + } + + public boolean getEnableDirectBuffers() + { + return _config.getBoolean("advanced.enableDirectBuffers", false); + } + + public boolean getEnableSSL() + { + return _config.getBoolean("connector.ssl.enabled", false); + } + + public boolean getSSLOnly() + { + return _config.getBoolean("connector.ssl.sslOnly", true); + } + + public int getSSLPort() + { + return _config.getInt("connector.ssl.port", DEFAUL_SSL_PORT); + } + + public String getKeystorePath() + { + return _config.getString("connector.ssl.keystorePath", "none"); + } + + public String getKeystorePassword() + { + return _config.getString("connector.ssl.keystorePassword", "none"); + } + + public String getCertType() + { + return _config.getString("connector.ssl.certType", "SunX509"); + } + + public boolean getQpidNIO() + { + return _config.getBoolean("connector.qpidnio", false); + } + + public boolean getUseBiasedWrites() + { + return _config.getBoolean("advanced.useWriteBiasedPool", false); + } + + public String getDefaultVirtualHost() + { + return _config.getString("virtualhosts.default"); + } + + public void setHousekeepingExpiredMessageCheckPeriod(long _housekeepingExpiredMessageCheckPeriod) + { + this._housekeepingExpiredMessageCheckPeriod = _housekeepingExpiredMessageCheckPeriod; + } + + public long getHousekeepingExpiredMessageCheckPeriod() + { + return _housekeepingExpiredMessageCheckPeriod; + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java index 02fb57c730..91d0b8d8da 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java @@ -20,266 +20,111 @@ */ package org.apache.qpid.server.configuration; -import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; +import java.util.Map; import org.apache.commons.configuration.CompositeConfiguration; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.XMLConfiguration; -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.exchange.ExchangeFactory; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.transactionlog.TransactionLog; -import org.apache.qpid.server.routing.RoutingTable; +import org.apache.qpid.server.store.MemoryMessageStore; public class VirtualHostConfiguration { - private static final Logger _logger = Logger.getLogger(VirtualHostConfiguration.class); + private Configuration _config; + private String _name; + private Map<String, QueueConfiguration> _queues = new HashMap<String, QueueConfiguration>(); + private Map<String, ExchangeConfiguration> _exchanges = new HashMap<String, ExchangeConfiguration>(); - private static XMLConfiguration _config; - private static final String VIRTUALHOST_PROPERTY_BASE = "virtualhost."; - - - public VirtualHostConfiguration(String configFile) throws ConfigurationException + public VirtualHostConfiguration(String name, Configuration config) throws ConfigurationException { - _logger.info("Loading Config file:" + configFile); - - _config = new XMLConfiguration(configFile); - - } - - - - private void configureVirtualHost(String virtualHostName, Configuration configuration) throws ConfigurationException, AMQException - { - _logger.debug("Loding configuration for virtaulhost: "+virtualHostName); - - - VirtualHost virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(virtualHostName); + _config = config; + _name = name; + Iterator i = _config.getList("queues.queue.name").iterator(); - - if(virtualHost == null) - { - throw new ConfigurationException("Unknown virtual host: " + virtualHostName); - } - - List exchangeNames = configuration.getList("exchanges.exchange.name"); - - for(Object exchangeNameObj : exchangeNames) + while (i.hasNext()) { - String exchangeName = String.valueOf(exchangeNameObj); - configureExchange(virtualHost, exchangeName, configuration); + String queueName = (String) i.next(); + CompositeConfiguration mungedConf = new CompositeConfiguration(); + mungedConf.addConfiguration(_config.subset("queues.queue." + queueName)); + mungedConf.addConfiguration(_config.subset("queues")); + _queues.put(queueName, new QueueConfiguration(queueName, mungedConf)); } - - List queueNames = configuration.getList("queues.queue.name"); - - for(Object queueNameObj : queueNames) + i = _config.getList("exchanges.exchange.name").iterator(); + int count = 0; + while (i.hasNext()) { - String queueName = String.valueOf(queueNameObj); - configureQueue(virtualHost, queueName, configuration); + CompositeConfiguration mungedConf = new CompositeConfiguration(); + mungedConf.addConfiguration(config.subset("exchanges.exchange(" + count++ + ")")); + mungedConf.addConfiguration(_config.subset("exchanges")); + String exchName = (String) i.next(); + _exchanges.put(exchName, new ExchangeConfiguration(exchName, mungedConf)); } - } - private void configureExchange(VirtualHost virtualHost, String exchangeNameString, Configuration configuration) throws AMQException + public String getName() { - - CompositeConfiguration exchangeConfiguration = new CompositeConfiguration(); - - exchangeConfiguration.addConfiguration(configuration.subset("exchanges.exchange."+ exchangeNameString)); - exchangeConfiguration.addConfiguration(configuration.subset("exchanges")); - - ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); - ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory(); - - AMQShortString exchangeName = new AMQShortString(exchangeNameString); - - - Exchange exchange; - - - - synchronized (exchangeRegistry) - { - exchange = exchangeRegistry.getExchange(exchangeName); - if(exchange == null) - { - - AMQShortString type = new AMQShortString(exchangeConfiguration.getString("type","direct")); - boolean durable = exchangeConfiguration.getBoolean("durable",false); - boolean autodelete = exchangeConfiguration.getBoolean("autodelete",false); - - Exchange newExchange = exchangeFactory.createExchange(exchangeName,type,durable,autodelete,0); - exchangeRegistry.registerExchange(newExchange); - } - - } + return _name; } - public static CompositeConfiguration getDefaultQueueConfiguration(VirtualHost host) + public long getHousekeepingExpiredMessageCheckPeriod() { - CompositeConfiguration queueConfiguration = null; - if (_config == null) - return null; - - Configuration vHostConfiguration = _config.subset(VIRTUALHOST_PROPERTY_BASE + host.getName()); - - if (vHostConfiguration == null) - return null; - - Configuration defaultQueueConfiguration = vHostConfiguration.subset("queues"); - if (defaultQueueConfiguration != null) - { - queueConfiguration = new CompositeConfiguration(); - queueConfiguration.addConfiguration(defaultQueueConfiguration); - } - - return queueConfiguration; + return _config.getLong("housekeeping.expiredMessageCheckPeriod", ApplicationRegistry.getInstance().getConfiguration().getHousekeepingExpiredMessageCheckPeriod()); } - private void configureQueue(VirtualHost virtualHost, String queueNameString, Configuration configuration) throws AMQException, ConfigurationException + public String getAuthenticationDatabase() { - CompositeConfiguration queueConfiguration = new CompositeConfiguration(); - - queueConfiguration.addConfiguration(configuration.subset("queues.queue."+ queueNameString)); - queueConfiguration.addConfiguration(configuration.subset("queues")); - - QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); - RoutingTable routingTable = virtualHost.getRoutingTable(); - ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); - - - AMQShortString queueName = new AMQShortString(queueNameString); - - AMQQueue queue; - - synchronized (queueRegistry) - { - queue = queueRegistry.getQueue(queueName); - - if (queue == null) - { - _logger.info("Creating queue '" + queueName + "' on virtual host " + virtualHost.getName()); - - boolean durable = queueConfiguration.getBoolean("durable" ,false); - boolean autodelete = queueConfiguration.getBoolean("autodelete", false); - String owner = queueConfiguration.getString("owner", null); - FieldTable arguments = null; - boolean priority = queueConfiguration.getBoolean("priority", false); - int priorities = queueConfiguration.getInt("priorities", -1); - if(priority || priorities > 0) - { - if(arguments == null) - { - arguments = new FieldTable(); - } - if (priorities < 0) - { - priorities = 10; - } - arguments.put(new AMQShortString("x-qpid-priorities"), priorities); - } - - - queue = AMQQueueFactory.createAMQQueueImpl(queueName, - durable, - owner == null ? null : new AMQShortString(owner) /* These queues will have no owner */, - autodelete /* Therefore autodelete makes no sence */, - virtualHost, - arguments, - queueConfiguration); - - if (queue.isDurable()) - { - routingTable.createQueue(queue); - } - - queueRegistry.registerQueue(queue); - } - else - { - _logger.info("Queue '" + queueNameString + "' already exists on virtual host "+virtualHost.getName()+", not creating."); - } - - String exchangeName = queueConfiguration.getString("exchange", null); - - Exchange exchange = exchangeRegistry.getExchange(exchangeName == null ? null : new AMQShortString(exchangeName)); - - if(exchange == null) - { - exchange = virtualHost.getExchangeRegistry().getDefaultExchange(); - } - - if (exchange == null) - { - throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + exchangeName); - } - - synchronized (exchange) - { - List routingKeys = queueConfiguration.getList("routingKey"); - if(routingKeys == null || routingKeys.isEmpty()) - { - routingKeys = Collections.singletonList(queue.getName()); - } - - for(Object routingKeyNameObj : routingKeys) - { - AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj)); - - - queue.bind(exchange, routingKey, null); - + return _config.getString("security.authentication.name"); + } - _logger.info("Queue '" + queue.getName() + "' bound to exchange:" + exchangeName + " RK:'" + routingKey + "'"); - } + public List getCustomExchanges() + { + return _config.getList("custom-exchanges.class-name"); + } - if(exchange != virtualHost.getExchangeRegistry().getDefaultExchange()) - { - queue.bind(virtualHost.getExchangeRegistry().getDefaultExchange(), queue.getName(), null); - } - } + public SecurityConfiguration getSecurityConfiguration() + { + return new SecurityConfiguration(_config.subset("security")); + } - } + public Configuration getStoreConfiguration() + { + return _config.subset("store"); } + public String getRoutingTableClass() + { + return _config.getString("routingtable.class"); + } - public void performBindings() throws AMQException, ConfigurationException + public String getTransactionLogClass() { - List virtualHostNames = _config.getList(VIRTUALHOST_PROPERTY_BASE + "name"); - String defaultVirtualHostName = _config.getString("default"); - if(defaultVirtualHostName != null) - { - ApplicationRegistry.getInstance().getVirtualHostRegistry().setDefaultVirtualHostName(defaultVirtualHostName); - } - _logger.info("Configuring " + virtualHostNames == null ? 0 : virtualHostNames.size() + " virtual hosts: " + virtualHostNames); + return _config.getString("store.class", MemoryMessageStore.class.getName()); + } - for(Object nameObject : virtualHostNames) - { - String name = String.valueOf(nameObject); - configureVirtualHost(name, _config.subset(VIRTUALHOST_PROPERTY_BASE + name)); - } + public List getExchanges() + { + return _config.getList("exchanges.exchange.name"); + } - if (virtualHostNames == null || virtualHostNames.isEmpty()) - { - throw new ConfigurationException( - "Virtualhost Configuration document does not contain a valid virtualhost."); - } + public ExchangeConfiguration getExchangeConfiguration(String exchangeName) + { + return _exchanges.get(exchangeName); } + public String[] getQueueNames() + { + return _queues.keySet().toArray(new String[_queues.size()]); + } + public QueueConfiguration getQueueConfiguration(String queueName) + { + return _queues.get(queueName); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java index 9d4c090971..c04b6c559c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java @@ -25,11 +25,11 @@ import java.util.HashMap; import java.util.Map; import org.apache.log4j.Logger; -import org.apache.commons.configuration.Configuration; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUnknownExchangeType; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -73,7 +73,8 @@ public class DefaultExchangeFactory implements ExchangeFactory return e; } - public void initialise(Configuration hostConfig) + @Override + public void initialise(VirtualHostConfiguration hostConfig) { if (hostConfig == null) @@ -81,7 +82,7 @@ public class DefaultExchangeFactory implements ExchangeFactory return; } - for(Object className : hostConfig.getList("custom-exchanges.class-name")) + for(Object className : hostConfig.getCustomExchanges()) { try { diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java index 0bcfec7181..2f76d41228 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java @@ -26,6 +26,7 @@ import org.apache.commons.configuration.Configuration; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; public interface ExchangeFactory @@ -34,7 +35,7 @@ public interface ExchangeFactory int ticket) throws AMQException; - void initialise(Configuration hostConfig); + void initialise(VirtualHostConfiguration hostConfig); Collection<ExchangeType<? extends Exchange>> getRegisteredTypes(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java index 621003be90..a2a6faf21b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java @@ -25,14 +25,16 @@ import javax.security.sasl.SaslServer; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.*; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.ConnectionSecureBody; +import org.apache.qpid.framing.ConnectionSecureOkBody; +import org.apache.qpid.framing.ConnectionTuneBody; +import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.HeartbeatConfig; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.security.auth.AuthenticationResult; +import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; import org.apache.qpid.server.state.AMQState; import org.apache.qpid.server.state.AMQStateManager; @@ -92,7 +94,7 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(0xFFFF, ConnectionStartOkMethodHandler.getConfiguredFrameSize(), - HeartbeatConfig.getInstance().getDelay()); + ApplicationRegistry.getInstance().getConfiguration().getHeartBeatDelay()); session.writeFrame(tuneBody.generateFrame(0)); session.setAuthorizedID(new UsernamePrincipal(ss.getAuthorizationID())); disposeSaslServer(session); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java index f53e56601b..6698ae888a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java @@ -23,18 +23,19 @@ package org.apache.qpid.server.handler; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; -import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.*; -import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.ConnectionSecureBody; +import org.apache.qpid.framing.ConnectionStartOkBody; +import org.apache.qpid.framing.ConnectionTuneBody; +import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.HeartbeatConfig; -import org.apache.qpid.server.protocol.AMQMinaProtocolSession; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.security.auth.AuthenticationResult; +import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; import org.apache.qpid.server.state.AMQState; import org.apache.qpid.server.state.AMQStateManager; @@ -47,8 +48,6 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< private static ConnectionStartOkMethodHandler _instance = new ConnectionStartOkMethodHandler(); - private static final int DEFAULT_FRAME_SIZE = 65536; - public static ConnectionStartOkMethodHandler getInstance() { return _instance; @@ -117,7 +116,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(0xFFFF, getConfiguredFrameSize(), - HeartbeatConfig.getInstance().getDelay()); + ApplicationRegistry.getInstance().getConfiguration().getHeartBeatDelay()); session.writeFrame(tuneBody.generateFrame(0)); break; case CONTINUE: @@ -153,8 +152,8 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< static int getConfiguredFrameSize() { - final Configuration config = ApplicationRegistry.getInstance().getConfiguration(); - final int framesize = config.getInt("advanced.framesize", DEFAULT_FRAME_SIZE); + final ServerConfiguration config = ApplicationRegistry.getInstance().getConfiguration(); + final int framesize = config.getFrameSize(); _logger.info("Framesize set to " + framesize); return framesize; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index b1e02aef7a..7f500cfb8a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -20,27 +20,28 @@ */ package org.apache.qpid.server.handler; -import java.util.concurrent.atomic.AtomicInteger; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.configuration.Configured; - -import org.apache.qpid.framing.*; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.QueueDeclareBody; +import org.apache.qpid.framing.QueueDeclareOkBody; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.configuration.Configurator; +import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.routing.RoutingTable; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.routing.RoutingTable; public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody> { @@ -53,17 +54,10 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar return _instance; } - @Configured(path = "queue.auto_register", defaultValue = "true") - public boolean autoRegister; + public boolean autoRegister = ApplicationRegistry.getInstance().getConfiguration().getQueueAutoRegister(); private final AtomicInteger _counter = new AtomicInteger(); - - protected QueueDeclareHandler() - { - Configurator.configure(this); - } - public void methodReceived(AMQStateManager stateManager, QueueDeclareBody body, int channelId) throws AMQException { AMQProtocolSession session = stateManager.getProtocolSession(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java index a0d2c7dc66..aea2f9d872 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java @@ -32,9 +32,12 @@ import org.apache.qpid.server.security.auth.rmi.RMIPasswordAuthenticator; import org.apache.qpid.server.security.auth.sasl.crammd5.CRAMMD5HashedInitialiser; import org.apache.qpid.server.security.auth.sasl.plain.PlainInitialiser; +import javax.management.InstanceNotFoundException; import javax.management.JMException; +import javax.management.MBeanRegistrationException; import javax.management.MBeanServer; import javax.management.MBeanServerFactory; +import javax.management.ObjectName; import javax.management.remote.JMXConnectorServer; import javax.management.remote.JMXConnectorServerFactory; import javax.management.remote.JMXServiceURL; @@ -85,7 +88,7 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry IApplicationRegistry appRegistry = ApplicationRegistry.getInstance(); // Retrieve the config parameters - boolean platformServer = appRegistry.getConfiguration().getBoolean("management.platform-mbeanserver", true); + boolean platformServer = appRegistry.getConfiguration().getPlatformMbeanserver(); _mbeanServer = platformServer ? ManagementFactory.getPlatformMBeanServer() @@ -105,11 +108,11 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry IApplicationRegistry appRegistry = ApplicationRegistry.getInstance(); - boolean jmxmpSecurity = appRegistry.getConfiguration().getBoolean("management.security-enabled", false); - int port = appRegistry.getConfiguration().getInt(MANAGEMENT_PORT_CONFIG_PATH, MANAGEMENT_PORT_DEFAULT); + boolean jmxmpSecurity = appRegistry.getConfiguration().getManagementSecurityEnabled(); + int port = appRegistry.getConfiguration().getJMXManagementPort(); //retrieve the Principal Database assigned to JMX authentication duties - String jmxDatabaseName = appRegistry.getConfiguration().getString("security.jmx.principal-database"); + String jmxDatabaseName = appRegistry.getConfiguration().getJMXPrincipalDatabase(); Map<String, PrincipalDatabase> map = appRegistry.getDatabaseManager().getDatabases(); PrincipalDatabase db = map.get(jmxDatabaseName); @@ -154,7 +157,7 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry RMIServerSocketFactory ssf; //check ssl enabled option in config, default to true if option is not set - boolean sslEnabled = appRegistry.getConfiguration().getBoolean("management.ssl.enabled", true); + boolean sslEnabled = appRegistry.getConfiguration().getManagementSSLEnabled(); if (sslEnabled) { @@ -167,7 +170,7 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry keyStorePath = System.getProperty("javax.net.ssl.keyStore"); } else{ - keyStorePath = appRegistry.getConfiguration().getString("management.ssl.keyStorePath", null); + keyStorePath = appRegistry.getConfiguration().getManagementKeyStorePath(); } //check the keystore path value is valid @@ -202,7 +205,7 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry if (System.getProperty("javax.net.ssl.keyStorePassword") == null) { - if (appRegistry.getConfiguration().getString("management.ssl.keyStorePassword") == null) + if (appRegistry.getConfiguration().getManagementKeyStorePassword() == null) { throw new ConfigurationException("JMX management SSL keystore password not defined, " + "unable to start requested SSL protected JMX server"); @@ -210,7 +213,7 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry else { System.setProperty("javax.net.ssl.keyStorePassword", - appRegistry.getConfiguration().getString("management.ssl.keyStorePassword")); + appRegistry.getConfiguration().getManagementKeyStorePassword()); } } @@ -379,6 +382,17 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry // Stopping the RMI registry UnicastRemoteObject.unexportObject(_rmiRegistry, true); } + for (ObjectName name : _mbeanServer.queryNames(null, null)) + { + try + { + _mbeanServer.unregisterMBean(name); + } + catch (JMException e) + { + // Really shouldn't happen, but we'll ignore that... + } + } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 4b69842c49..205ca73f13 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -582,7 +582,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable if (delay > 0) { _minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay); - _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, HeartbeatConfig.getInstance().getTimeout(delay)); + _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, (int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay)); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index d8dbf97e49..0dbefd8798 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.server.protocol; +import java.io.IOException; +import java.net.InetSocketAddress; + import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IdleStatus; @@ -34,15 +37,19 @@ import org.apache.mina.filter.executor.ExecutorFilter; import org.apache.mina.util.SessionUtil; import org.apache.qpid.AMQException; import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.framing.*; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQProtocolHeaderException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.HeartbeatBody; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.ProtocolInitiation; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.transport.ConnectorConfiguration; import org.apache.qpid.ssl.SSLContextFactory; -import java.io.IOException; -import java.net.InetSocketAddress; - /** * The protocol handler handles "protocol events" for all connections. The state * associated with an individual connection is accessed through the protocol session. @@ -56,9 +63,6 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter private final IApplicationRegistry _applicationRegistry; - private static String DEFAULT_BUFFER_READ_LIMIT_SIZE = "262144"; - private static String DEFAULT_BUFFER_WRITE_LIMIT_SIZE = "262144"; - private final int BUFFER_READ_LIMIT_SIZE; private final int BUFFER_WRITE_LIMIT_SIZE; @@ -72,8 +76,8 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter _applicationRegistry = applicationRegistry; // Read the configuration from the application registry - BUFFER_READ_LIMIT_SIZE = Integer.parseInt(_applicationRegistry.getConfiguration().getString("broker.connector.protectio.readBufferLimitSize", DEFAULT_BUFFER_READ_LIMIT_SIZE)); - BUFFER_WRITE_LIMIT_SIZE = Integer.parseInt(_applicationRegistry.getConfiguration().getString("broker.connector.protectio.writeBufferLimitSize", DEFAULT_BUFFER_WRITE_LIMIT_SIZE)); + BUFFER_READ_LIMIT_SIZE = _applicationRegistry.getConfiguration().getBufferReadLimit(); + BUFFER_WRITE_LIMIT_SIZE = _applicationRegistry.getConfiguration().getBufferWriteLimit(); _logger.debug("AMQPFastProtocolHandler created"); } @@ -92,17 +96,22 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter _logger.info("Protocol session created for:" + protocolSession.getRemoteAddress()); final QpidProtocolCodecFilter pcf = new QpidProtocolCodecFilter(codecFactory); - - ConnectorConfiguration connectorConfig = ApplicationRegistry.getInstance(). - getConfiguredObject(ConnectorConfiguration.class); - if (connectorConfig.enableExecutorPool) + final ServerConfiguration config = _applicationRegistry.getConfiguration(); + + String keystorePath = config.getKeystorePath(); + String keystorePassword = config.getKeystorePassword(); + String certType = config.getCertType(); + SSLContextFactory sslContextFactory = null; + boolean isSsl = false; + if (config.getEnableSSL() && isSSLClient(config, protocolSession)) { - if (connectorConfig.enableSSL && isSSLClient(connectorConfig, protocolSession)) + sslContextFactory = new SSLContextFactory(keystorePath, keystorePassword, certType); + isSsl = true; + } + if (config.getEnableExecutorPool()) + { + if (isSsl) { - String keystorePath = connectorConfig.keystorePath; - String keystorePassword = connectorConfig.keystorePassword; - String certType = connectorConfig.certType; - SSLContextFactory sslContextFactory = new SSLContextFactory(keystorePath, keystorePassword, certType); protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter", new SSLFilter(sslContextFactory.buildServerContext())); } @@ -111,19 +120,14 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter else { protocolSession.getFilterChain().addLast("protocolFilter", pcf); - if (connectorConfig.enableSSL && isSSLClient(connectorConfig, protocolSession)) + if (isSsl) { - String keystorePath = connectorConfig.keystorePath; - String keystorePassword = connectorConfig.keystorePassword; - String certType = connectorConfig.certType; - SSLContextFactory sslContextFactory = new SSLContextFactory(keystorePath, keystorePassword, certType); protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter", new SSLFilter(sslContextFactory.buildServerContext())); } - } - if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("broker.connector.protectio.enabled", false)) + if (ApplicationRegistry.getInstance().getConfiguration().getProtectIOEnabled()) { try { @@ -271,10 +275,10 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter } } - protected boolean isSSLClient(ConnectorConfiguration connectionConfig, + protected boolean isSSLClient(ServerConfiguration connectionConfig, IoSession protocolSession) { InetSocketAddress addr = (InetSocketAddress) protocolSession.getLocalAddress(); - return addr.getPort() == connectionConfig.sslPort; + return addr.getPort() == connectionConfig.getSSLPort(); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/HeartbeatConfig.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/HeartbeatConfig.java deleted file mode 100644 index 310deaaf55..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/HeartbeatConfig.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol; - -import org.apache.qpid.configuration.Configured; -import org.apache.qpid.server.registry.ApplicationRegistry; - -public class HeartbeatConfig -{ - @Configured(path = "heartbeat.delay", defaultValue = "5") - public int delay = 5;//in secs - @Configured(path = "heartbeat.timeoutFactor", defaultValue = "2.0") - public double timeoutFactor = 2; - - public double getTimeoutFactor() - { - return timeoutFactor; - } - - public void setTimeoutFactor(double timeoutFactor) - { - this.timeoutFactor = timeoutFactor; - } - - public int getDelay() - { - return delay; - } - - public void setDelay(int delay) - { - this.delay = delay; - } - - int getTimeout(int writeDelay) - { - return (int) (timeoutFactor * writeDelay); - } - - public static HeartbeatConfig getInstance() - { - return ApplicationRegistry.getInstance().getConfiguredObject(HeartbeatConfig.class); - } - - public String toString() - { - return "HeartBeatConfig{delay = " + delay + " timeoutFactor = " + timeoutFactor + "}"; - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 2d4a9c4b40..0838b71c54 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -20,9 +20,10 @@ */ package org.apache.qpid.server.queue; -import org.apache.commons.configuration.Configuration; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.configuration.QueueConfiguration; +import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.subscription.Subscription; @@ -141,6 +142,8 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> long getMinimumAlertRepeatGap(); + void setMinimumAlertRepeatGap(long value); + void deleteMessageFromTop(StoreContext storeContext) throws AMQException; @@ -162,7 +165,6 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> void stop(); - /** * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription * already exists. @@ -210,6 +212,4 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> { public void doTask(AMQQueue queue) throws AMQException; } - - void configure(Configuration virtualHostDefaultQueueConfiguration); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index be8c19d18f..eb0a011e93 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -20,12 +20,11 @@ */ package org.apache.qpid.server.queue; -import org.apache.commons.configuration.Configuration; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.AMQException; public class AMQQueueFactory @@ -33,25 +32,10 @@ public class AMQQueueFactory public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities"); public static AMQQueue createAMQQueueImpl(AMQShortString name, - boolean durable, - AMQShortString owner, - boolean autoDelete, - VirtualHost virtualHost, final FieldTable arguments) - - throws AMQException - { - - return createAMQQueueImpl(name, durable, owner, autoDelete, - virtualHost, arguments, - VirtualHostConfiguration.getDefaultQueueConfiguration(virtualHost)); - } - - public static AMQQueue createAMQQueueImpl(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, - VirtualHost virtualHost, final FieldTable arguments, - Configuration queueConfiguration) + VirtualHost virtualHost, final FieldTable arguments) throws AMQException { @@ -66,13 +50,41 @@ public class AMQQueueFactory { q = new SimpleAMQQueue(name, durable, owner, autoDelete, virtualHost); } - if (q != null && queueConfiguration != null) - { - q.configure(queueConfiguration); - } //Register the new queue virtualHost.getQueueRegistry().registerQueue(q); return q; } + + public static AMQQueue createAMQQueueImpl(QueueConfiguration config, VirtualHost host) throws AMQException + { + AMQShortString queueName = new AMQShortString(config.getName()); + + boolean durable = config.getDurable(); + boolean autodelete = config.getAutoDelete(); + AMQShortString owner = (config.getOwner() != null) ? new AMQShortString(config.getOwner()) : null; + FieldTable arguments = null; + boolean priority = config.getPriority(); + int priorities = config.getPriorities(); + if(priority || priorities > 0) + { + if(arguments == null) + { + arguments = new FieldTable(); + } + if (priorities < 0) + { + priorities = 10; + } + arguments.put(new AMQShortString("x-qpid-priorities"), priorities); + } + + AMQQueue q = createAMQQueueImpl(queueName, durable, owner, autodelete, host, arguments); + q.setMaximumMessageAge(config.getMaximumMessageAge()); + q.setMaximumQueueDepth(config.getMaximumQueueDepth()); + q.setMaximumMessageSize(config.getMaximumMessageSize()); + q.setMaximumMessageCount(config.getMaximumMessageCount()); + q.setMinimumAlertRepeatGap(config.getMinimumAlertRepeatGap()); + return q; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AsyncDeliveryConfig.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AsyncDeliveryConfig.java deleted file mode 100644 index 290fedcf7b..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AsyncDeliveryConfig.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; - -import org.apache.qpid.configuration.Configured; -import org.apache.qpid.server.registry.ApplicationRegistry; - -public class AsyncDeliveryConfig -{ - private Executor _executor; - - @Configured(path = "delivery.poolsize", defaultValue = "0") - public int poolSize; - - public Executor getExecutor() - { - if (_executor == null) - { - if (poolSize > 0) - { - _executor = Executors.newFixedThreadPool(poolSize); - } - else - { - _executor = Executors.newCachedThreadPool(); - } - } - return _executor; - } - - public static Executor getAsyncDeliveryExecutor() - { - return ApplicationRegistry.getInstance().getConfiguredObject(AsyncDeliveryConfig.class).getExecutor(); - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index 94580a00ac..f7ae04c528 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -43,7 +43,7 @@ public class IncomingMessage implements Filterable<RuntimeException> private static final Logger _logger = Logger.getLogger(IncomingMessage.class); private static final boolean SYNCHED_CLOCKS = - ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false); + ApplicationRegistry.getInstance().getConfiguration().getSynchedClocks(); private final MessagePublishInfo _messagePublishInfo; private ContentHeaderBody _contentHeaderBody; @@ -51,7 +51,7 @@ public class IncomingMessage implements Filterable<RuntimeException> private final TransactionalContext _txnContext; private static final boolean MSG_AUTH = - ApplicationRegistry.getInstance().getConfiguration().getBoolean("security.msg-auth", false); + ApplicationRegistry.getInstance().getConfiguration().getMsgAuth(); /** diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index a08e4e2667..42f52ce730 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -1,28 +1,9 @@ package org.apache.qpid.server.queue; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.configuration.Configurator; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.subscription.SubscriptionList; -import org.apache.qpid.server.output.ProtocolOutputConverter; -import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.server.transactionlog.TransactionLog; -import org.apache.qpid.AMQException; -import org.apache.qpid.pool.ReadWriteRunnable; -import org.apache.qpid.pool.ReferenceCountingExecutorService; -import org.apache.qpid.configuration.Configured; -import org.apache.commons.configuration.Configuration; -import org.apache.log4j.Logger; - -import javax.management.JMException; -import java.util.List; -import java.util.Set; import java.util.ArrayList; import java.util.EnumSet; +import java.util.List; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; @@ -30,6 +11,25 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import javax.management.JMException; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.pool.ReadWriteRunnable; +import org.apache.qpid.pool.ReferenceCountingExecutorService; +import org.apache.qpid.server.configuration.QueueConfiguration; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.output.ProtocolOutputConverter; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.subscription.SubscriptionList; +import org.apache.qpid.server.transactionlog.TransactionLog; +import org.apache.qpid.server.virtualhost.VirtualHost; + /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -91,24 +91,19 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private final AtomicLong _totalMessagesReceived = new AtomicLong(); /** max allowed size(KB) of a single message */ - @Configured(path = "maximumMessageSize", defaultValue = "0") - public long _maximumMessageSize; + public long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize(); /** max allowed number of messages on a queue. */ - @Configured(path = "maximumMessageCount", defaultValue = "0") - public long _maximumMessageCount; + public long _maximumMessageCount = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageCount(); /** max queue depth for the queue */ - @Configured(path = "maximumQueueDepth", defaultValue = "0") - public long _maximumQueueDepth; + public long _maximumQueueDepth = ApplicationRegistry.getInstance().getConfiguration().getMaximumQueueDepth(); /** maximum message age before alerts occur */ - @Configured(path = "maximumMessageAge", defaultValue = "0") - public long _maximumMessageAge; + public long _maximumMessageAge = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageAge(); - /** the minimum interval between sending out consequetive alerts of the same type */ - @Configured(path = "minimumAlertRepeatGap", defaultValue = "0") - public long _minimumAlertRepeatGap; + /** the minimum interval between sending out consecutive alerts of the same type */ + public long _minimumAlertRepeatGap = ApplicationRegistry.getInstance().getConfiguration().getMinimumAlertRepeatGap(); private static final int MAX_ASYNC_DELIVERIES = 10; @@ -167,7 +162,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } - private void resetNotifications() + public void resetNotifications() { // This ensure that the notification checks for the configured alerts are created. setMaximumMessageAge(_maximumMessageAge); @@ -1590,10 +1585,4 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } return ids; } - - public void configure(Configuration queueConfiguration) - { - Configurator.configure(this, queueConfiguration); - resetNotifications(); - } -}
\ No newline at end of file +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index 02124a3737..477beeadcb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -20,24 +20,20 @@ */ package org.apache.qpid.server.registry; -import org.apache.commons.configuration.Configuration; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; + import org.apache.log4j.Logger; -import org.apache.qpid.server.configuration.Configurator; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.mina.common.IoAcceptor; +import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.management.ManagedObjectRegistry; -import org.apache.qpid.server.security.auth.manager.AuthenticationManager; -import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; -import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; -import org.apache.qpid.server.security.auth.database.ConfigurationFilePrincipalDatabaseManager; -import org.apache.qpid.server.security.access.ACLPlugin; -import org.apache.qpid.server.security.access.ACLManager; import org.apache.qpid.server.plugins.PluginManager; -import org.apache.mina.common.IoAcceptor; - -import java.util.HashMap; -import java.util.Map; -import java.net.InetSocketAddress; +import org.apache.qpid.server.security.access.ACLManager; +import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; +import org.apache.qpid.server.security.auth.manager.AuthenticationManager; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; /** * An abstract application registry that provides access to configuration information and handles the @@ -53,7 +49,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry private final Map<Class<?>, Object> _configuredObjects = new HashMap<Class<?>, Object>(); - protected final Configuration _configuration; + protected final ServerConfiguration _configuration; public static final int DEFAULT_INSTANCE = 1; public static final String DEFAULT_APPLICATION_REGISTRY = "org.apache.qpid.server.util.NullApplicationRegistry"; @@ -154,7 +150,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry } } - protected ApplicationRegistry(Configuration configuration) + protected ApplicationRegistry(ServerConfiguration configuration) { _configuration = configuration; } @@ -242,7 +238,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry } } - public Configuration getConfiguration() + public ServerConfiguration getConfiguration() { return _configuration; } @@ -255,26 +251,6 @@ public abstract class ApplicationRegistry implements IApplicationRegistry } } - public <T> T getConfiguredObject(Class<T> instanceType) - { - T instance = (T) _configuredObjects.get(instanceType); - if (instance == null) - { - try - { - instance = instanceType.newInstance(); - } - catch (Exception e) - { - _logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor"); - throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor", e); - } - Configurator.configure(instance); - _configuredObjects.put(instanceType, instance); - } - return instance; - } - public static void setDefaultApplicationRegistry(String clazz) { _APPLICATION_REGISTRY = clazz; @@ -287,7 +263,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry public ACLManager getAccessManager() { - return new ACLManager(_configuration, _pluginManager); + return new ACLManager(_configuration.getSecurityConfiguration(), _pluginManager); } public ManagedObjectRegistry getManagedObjectRegistry() diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java index c34c4bf80a..39164883f9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java @@ -21,71 +21,25 @@ package org.apache.qpid.server.registry; import java.io.File; -import java.util.Collection; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import org.apache.commons.configuration.CompositeConfiguration; -import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.SystemConfiguration; -import org.apache.commons.configuration.XMLConfiguration; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.management.JMXManagedObjectRegistry; -import org.apache.qpid.server.management.ManagedObjectRegistry; -import org.apache.qpid.server.management.ManagementConfiguration; import org.apache.qpid.server.management.NoopManagedObjectRegistry; import org.apache.qpid.server.plugins.PluginManager; -import org.apache.qpid.server.security.auth.manager.AuthenticationManager; +import org.apache.qpid.server.security.access.ACLManager; import org.apache.qpid.server.security.auth.database.ConfigurationFilePrincipalDatabaseManager; -import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; -import org.apache.qpid.server.security.access.ACLPlugin; -import org.apache.qpid.server.security.access.ACLManager; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -import org.apache.qpid.AMQException; public class ConfigurationFileApplicationRegistry extends ApplicationRegistry { public ConfigurationFileApplicationRegistry(File configurationURL) throws ConfigurationException { - super(config(configurationURL)); - } - - // Our configuration class needs to make the interpolate method - // public so it can be called below from the config method. - private static class MyConfiguration extends CompositeConfiguration - { - public String interpolate(String obj) - { - return super.interpolate(obj); - } - } - - private static final Configuration config(File url) throws ConfigurationException - { - // We have to override the interpolate methods so that - // interpolation takes place accross the entirety of the - // composite configuration. Without doing this each - // configuration object only interpolates variables defined - // inside itself. - final MyConfiguration conf = new MyConfiguration(); - conf.addConfiguration(new SystemConfiguration() - { - protected String interpolate(String o) - { - return conf.interpolate(o); - } - }); - conf.addConfiguration(new XMLConfiguration(url) - { - protected String interpolate(String o) - { - return conf.interpolate(o); - } - }); - return conf; + super(new ServerConfiguration(configurationURL)); } public void initialise() throws Exception @@ -94,9 +48,9 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry _virtualHostRegistry = new VirtualHostRegistry(); - _pluginManager = new PluginManager(_configuration.getString("plugin-directory")); + _pluginManager = new PluginManager(_configuration.getPluginDirectory()); - _accessManager = new ACLManager(_configuration, _pluginManager); + _accessManager = new ACLManager(_configuration.getSecurityConfiguration(), _pluginManager); _databaseManager = new ConfigurationFilePrincipalDatabaseManager(_configuration); @@ -111,18 +65,17 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry } private void initialiseVirtualHosts() throws Exception - { - for (String name : getVirtualHostNames()) + { + for (String name : _configuration.getVirtualHosts()) { - - _virtualHostRegistry.registerVirtualHost(new VirtualHost(name, getConfiguration().subset("virtualhosts.virtualhost." + name))); + _virtualHostRegistry.registerVirtualHost(new VirtualHost(_configuration.getVirtualHostConfig(name))); } + getVirtualHostRegistry().setDefaultVirtualHostName(_configuration.getDefaultVirtualHost()); } private void initialiseManagedObjectRegistry() throws AMQException { - ManagementConfiguration config = getConfiguredObject(ManagementConfiguration.class); - if (config.enabled) + if (_configuration.getManagementEnabled()) { _managedObjectRegistry = new JMXManagedObjectRegistry(); } @@ -131,10 +84,4 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry _managedObjectRegistry = new NoopManagedObjectRegistry(); } } - - public Collection<String> getVirtualHostNames() - { - return getConfiguration().getList("virtualhosts.virtualhost.name"); - } - } diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java index e68dca285c..a1f30c6eed 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.net.InetSocketAddress; import org.apache.commons.configuration.Configuration; +import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.management.ManagedObjectRegistry; import org.apache.qpid.server.plugins.PluginManager; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; @@ -49,21 +50,11 @@ public interface IApplicationRegistry void close() throws Exception; /** - * This gets access to a "configured object". A configured object has fields populated from a the configuration - * object (Commons Configuration) automatically, where it has the appropriate attributes defined on fields. - * Application registry implementations can choose the refresh strategy or caching approach. - * @param instanceType the type of object you want initialised. This must be unique - i.e. you can only - * have a single object of this type in the system. - * @return the configured object - */ - <T> T getConfiguredObject(Class<T> instanceType); - - /** * Get the low level configuration. For use cases where the configured object approach is not required * you can get the complete configuration information. * @return a Commons Configuration instance */ - Configuration getConfiguration(); + ServerConfiguration getConfiguration(); ManagedObjectRegistry getManagedObjectRegistry(); @@ -71,8 +62,6 @@ public interface IApplicationRegistry AuthenticationManager getAuthenticationManager(); - Collection<String> getVirtualHostNames(); - VirtualHostRegistry getVirtualHostRegistry(); ACLManager getAccessManager(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/routing/RoutingTable.java b/java/broker/src/main/java/org/apache/qpid/server/routing/RoutingTable.java index 6344127b24..0c62638710 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/routing/RoutingTable.java +++ b/java/broker/src/main/java/org/apache/qpid/server/routing/RoutingTable.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.routing; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -41,7 +42,7 @@ public interface RoutingTable * * @throws Exception If any error occurs that means the store is unable to configure itself. */ - void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception; + void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception; /** * Called to close and cleanup any resources used by the message store. diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java index 356ee815dd..57c6098874 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java @@ -30,6 +30,9 @@ import java.util.Map.Entry; import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.configuration.SecurityConfiguration; +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.plugins.PluginManager; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -46,12 +49,12 @@ public class ACLManager private Map<String, ACLPlugin> _globalPlugins = new HashMap<String, ACLPlugin>(); private Map<String, ACLPlugin> _hostPlugins = new HashMap<String, ACLPlugin>(); - public ACLManager(Configuration configuration, PluginManager manager) + public ACLManager(SecurityConfiguration configuration, PluginManager manager) { this(configuration, manager, null); } - public ACLManager(Configuration configuration, PluginManager manager, ACLPluginFactory securityPlugin) + public ACLManager(SecurityConfiguration configuration, PluginManager manager, ACLPluginFactory securityPlugin) { _pluginManager = manager; @@ -70,14 +73,14 @@ public class ACLManager } - public void configureHostPlugins(Configuration hostConfig) + public void configureHostPlugins(SecurityConfiguration hostConfig) { _hostPlugins = configurePlugins(hostConfig); } - public Map<String, ACLPlugin> configurePlugins(Configuration configuration) + public Map<String, ACLPlugin> configurePlugins(SecurityConfiguration hostConfig) { - Configuration securityConfig = configuration.subset("security"); + Configuration securityConfig = hostConfig.getConfiguration(); Map<String, ACLPlugin> plugins = new HashMap<String, ACLPlugin>(); Iterator keys = securityConfig.getKeys(); Collection<String> handledTags = new HashSet(); @@ -86,7 +89,6 @@ public class ACLManager // Splitting the string is necessary here because of the way that getKeys() returns only // bottom level children String tag = ((String) keys.next()).split("\\.", 2)[0]; - if (!handledTags.contains(tag)) { for (ACLPluginFactory plugin : _allSecurityPlugins.values()) diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java index fc96776a3a..e0d4c49af1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java @@ -34,6 +34,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.configuration.PropertyUtils; import org.apache.qpid.configuration.PropertyException; +import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.security.auth.database.PrincipalDatabase; import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; @@ -46,20 +47,18 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab { private static final Logger _logger = Logger.getLogger(ConfigurationFilePrincipalDatabaseManager.class); - private static final String _base = "security.principal-databases.principal-database"; - Map<String, PrincipalDatabase> _databases; - public ConfigurationFilePrincipalDatabaseManager(Configuration configuration) throws Exception + public ConfigurationFilePrincipalDatabaseManager(ServerConfiguration _configuration) throws Exception { _logger.info("Initialising PrincipleDatabase authentication manager"); - _databases = initialisePrincipalDatabases(configuration); + _databases = initialisePrincipalDatabases(_configuration); } - private Map<String, PrincipalDatabase> initialisePrincipalDatabases(Configuration configuration) throws Exception + private Map<String, PrincipalDatabase> initialisePrincipalDatabases(ServerConfiguration _configuration) throws Exception { - List<String> databaseNames = configuration.getList(_base + ".name"); - List<String> databaseClasses = configuration.getList(_base + ".class"); + List<String> databaseNames = _configuration.getPrincipalDatabaseNames(); + List<String> databaseClasses = _configuration.getPrincipalDatabaseClass(); Map<String, PrincipalDatabase> databases = new HashMap<String, PrincipalDatabase>(); if (databaseNames.size() == 0) @@ -84,7 +83,7 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab throw new Exception("Principal databases must implement the PrincipalDatabase interface"); } - initialisePrincipalDatabase((PrincipalDatabase) o, configuration, i); + initialisePrincipalDatabase((PrincipalDatabase) o, _configuration, i); String name = databaseNames.get(i); if ((name == null) || (name.length() == 0)) @@ -105,12 +104,11 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab return databases; } - private void initialisePrincipalDatabase(PrincipalDatabase principalDatabase, Configuration config, int index) + private void initialisePrincipalDatabase(PrincipalDatabase principalDatabase, ServerConfiguration _configuration, int index) throws FileNotFoundException, ConfigurationException { - String baseName = _base + "(" + index + ").attributes.attribute."; - List<String> argumentNames = config.getList(baseName + "name"); - List<String> argumentValues = config.getList(baseName + "value"); + List<String> argumentNames = _configuration.getPrincipalDatabaseAttributeNames(index); + List<String> argumentValues = _configuration.getPrincipalDatabaseAttributeValues(index); for (int i = 0; i < argumentNames.size(); i++) { String argName = argumentNames.get(i); @@ -166,18 +164,17 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab return _databases; } - public void initialiseManagement(Configuration config) throws ConfigurationException + public void initialiseManagement(ServerConfiguration config) throws ConfigurationException { try { AMQUserManagementMBean _mbean = new AMQUserManagementMBean(); - String baseSecurity = "security.jmx"; - List<String> principalDBs = config.getList(baseSecurity + ".principal-database"); + List<String> principalDBs = config.getManagementPrincipalDBs(); if (principalDBs.size() == 0) { - throw new ConfigurationException("No principal-database specified for jmx security(" + baseSecurity + ".principal-database)"); + throw new ConfigurationException("No principal-database specified for jmx security"); } String databaseName = principalDBs.get(0); @@ -191,11 +188,11 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab _mbean.setPrincipalDatabase(database); - List<String> jmxaccesslist = config.getList(baseSecurity + ".access"); + List<String> jmxaccesslist = config.getManagementAccessList(); if (jmxaccesslist.size() == 0) { - throw new ConfigurationException("No access control files specified for jmx security(" + baseSecurity + ".access)"); + throw new ConfigurationException("No access control files specified for jmx security"); } String jmxaccesssFile = null; diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabaseManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabaseManager.java index 2c553ae76a..f9882f8810 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabaseManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabaseManager.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.security.auth.database; +import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.security.auth.database.PrincipalDatabase; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; @@ -30,5 +31,5 @@ public interface PrincipalDatabaseManager { public Map<String, PrincipalDatabase> getDatabases(); - public void initialiseManagement(Configuration config) throws ConfigurationException; + public void initialiseManagement(ServerConfiguration _configuration) throws ConfigurationException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java index 6b86a46bd2..4efe381a8b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java @@ -20,7 +20,8 @@ */ package org.apache.qpid.server.security.auth.database; -import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.server.configuration.ServerConfiguration; import java.util.Map; import java.util.Properties; @@ -41,7 +42,8 @@ public class PropertiesPrincipalDatabaseManager implements PrincipalDatabaseMana return _databases; } - public void initialiseManagement(Configuration config) + @Override + public void initialiseManagement(ServerConfiguration _configuration) throws ConfigurationException { //todo } diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java index 2cbbdc85ff..98c060599a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.security.auth.manager; import org.apache.log4j.Logger; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.security.auth.database.PrincipalDatabase; @@ -60,7 +61,7 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan /** The name for the required SASL Server mechanisms */ public static final String PROVIDER_NAME= "AMQSASLProvider-Server"; - public PrincipalDatabaseAuthenticationManager(String name, Configuration hostConfig) throws Exception + public PrincipalDatabaseAuthenticationManager(String name, VirtualHostConfiguration hostConfig) throws Exception { _logger.info("Initialising " + (name == null ? "Default" : "'" + name + "'") + " PrincipleDatabase authentication manager."); @@ -77,7 +78,7 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan } else { - String databaseName = hostConfig.getString("security.authentication.name"); + String databaseName = hostConfig.getAuthenticationDatabase(); if (databaseName == null) { @@ -121,14 +122,6 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan private void initialiseAuthenticationMechanisms(Map<String, Class<? extends SaslServerFactory>> providerMap, Map<String, PrincipalDatabase> databases) throws Exception { -// Configuration config = ApplicationRegistry.getInstance().getConfiguration(); -// List<String> mechanisms = config.getList("security.sasl.mechanisms.mechanism.initialiser.class"); -// -// // Maps from the mechanism to the properties used to initialise the server. See the method -// // Sasl.createSaslServer for details of the use of these properties. This map is populated during initialisation -// // of each provider. - - if (databases.size() > 1) { _logger.warn("More than one principle database provided currently authentication mechanism will override each other."); @@ -136,13 +129,11 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan for (Map.Entry<String, PrincipalDatabase> entry : databases.entrySet()) { - // fixme As the database now provide the mechanisms they support, they will ... // overwrite each other in the map. There should only be one database per vhost. // But currently we must have authentication before vhost definition. initialiseAuthenticationMechanisms(providerMap, entry.getValue()); } - } private void initialiseAuthenticationMechanisms(Map<String, Class<? extends SaslServerFactory>> providerMap, PrincipalDatabase database) throws Exception diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index 500fd4c7bf..35d8d9ea6c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.store; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; @@ -142,7 +143,7 @@ public class DerbyMessageStore implements TransactionLog, RoutingTable private State _state = State.INITIAL; - public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception + public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception { //Only initialise when loaded with the old 'store' confing ignore the new 'RoutingTable' config if (base.equals("store")) @@ -156,7 +157,7 @@ public class DerbyMessageStore implements TransactionLog, RoutingTable _logger.info("Configuring Derby message store for virtual host " + virtualHost.getName()); QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); - final String databasePath = config.getString(base + "." + ENVIRONMENT_PATH_PROPERTY, "derbyDB"); + final String databasePath = config.getStoreConfiguration().getString(base + "." + ENVIRONMENT_PATH_PROPERTY, "derbyDB"); File environmentPath = new File(databasePath); if (!environmentPath.exists()) diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index eee7be7ef6..aa7b6e2542 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -27,6 +27,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.queue.MessageMetaData; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -70,19 +71,19 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(DEFAULT_HASHTABLE_CAPACITY); } - public void configure(String base, Configuration config) + public void configure(String base, VirtualHostConfiguration config) { //Only initialise when called with current 'store' configs i.e. don't reinit when used as a 'RoutingTable' if (base.equals("store")) { - int hashtableCapacity = config.getInt(base + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY); + int hashtableCapacity = config.getStoreConfiguration().getInt(base + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY); _log.info("Using capacity " + hashtableCapacity + " for hash tables"); _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(hashtableCapacity); _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(hashtableCapacity); } } - public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception + public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception { configure(base, config); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java b/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java index c927bb3272..4fa85dd6cd 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transactionlog/TransactionLog.java @@ -26,6 +26,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.queue.AMQQueue; @@ -67,7 +68,7 @@ public interface TransactionLog * * @throws Exception If any error occurs that means the store is unable to configure itself. */ - void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception; + void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception; /** * Called to close and cleanup any resources used by the message store. diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java deleted file mode 100644 index b67bb98e28..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.transport; - -import org.apache.mina.common.IoAcceptor; -import org.apache.mina.util.NewThreadExecutor; -import org.apache.qpid.configuration.Configured; -import org.apache.log4j.Logger; - -public class ConnectorConfiguration -{ - private static final Logger _logger = Logger.getLogger(ConnectorConfiguration.class); - - public static final String DEFAULT_PORT = "5672"; - - public static final String SSL_PORT = "8672"; - - @Configured(path = "connector.processors", - defaultValue = "4") - public int processors; - - @Configured(path = "connector.port", - defaultValue = DEFAULT_PORT) - public int port; - - @Configured(path = "connector.bind", - defaultValue = "wildcard") - public String bindAddress; - - @Configured(path = "connector.socketReceiveBuffer", - defaultValue = "32767") - public int socketReceiveBufferSize; - - @Configured(path = "connector.socketWriteBuffer", - defaultValue = "32767") - public int socketWriteBuferSize; - - @Configured(path = "connector.tcpNoDelay", - defaultValue = "true") - public boolean tcpNoDelay; - - @Configured(path = "advanced.filterchain[@enableExecutorPool]", - defaultValue = "false") - public boolean enableExecutorPool; - - @Configured(path = "advanced.enablePooledAllocator", - defaultValue = "false") - public boolean enablePooledAllocator; - - @Configured(path = "advanced.enableDirectBuffers", - defaultValue = "false") - public boolean enableDirectBuffers; - - @Configured(path = "connector.ssl.enabled", - defaultValue = "false") - public boolean enableSSL; - - @Configured(path = "connector.ssl.sslOnly", - defaultValue = "true") - public boolean sslOnly; - - @Configured(path = "connector.ssl.port", - defaultValue = SSL_PORT) - public int sslPort; - - @Configured(path = "connector.ssl.keystorePath", - defaultValue = "none") - public String keystorePath; - - @Configured(path = "connector.ssl.keystorePassword", - defaultValue = "none") - public String keystorePassword; - - @Configured(path = "connector.ssl.certType", - defaultValue = "SunX509") - public String certType; - - @Configured(path = "connector.qpidnio", - defaultValue = "false") - public boolean _multiThreadNIO; - - @Configured(path = "advanced.useWriteBiasedPool", - defaultValue = "false") - public boolean useBiasedWrites; - - - public IoAcceptor createAcceptor() - { - if (_multiThreadNIO) - { - _logger.warn("Using Qpid Multithreaded IO Processing"); - return new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(processors, new NewThreadExecutor()); - } - else - { - _logger.warn("Using Mina IO Processing"); - return new org.apache.mina.transport.socket.nio.SocketAcceptor(processors, new NewThreadExecutor()); - } - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java index 88ad87b9c1..eda2d3a94e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java @@ -26,7 +26,11 @@ import java.util.HashMap; import java.util.Properties; import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.MapConfiguration; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.management.NoopManagedObjectRegistry; import org.apache.qpid.server.plugins.PluginManager; import org.apache.qpid.server.registry.ApplicationRegistry; @@ -39,17 +43,16 @@ import org.apache.qpid.server.virtualhost.VirtualHostRegistry; public class NullApplicationRegistry extends ApplicationRegistry { - public NullApplicationRegistry() + public NullApplicationRegistry() throws ConfigurationException { - super(new MapConfiguration(new HashMap())); + super(new ServerConfiguration(new PropertiesConfiguration())); } public void initialise() throws Exception { _logger.info("Initialising NullApplicationRegistry"); - _configuration.addProperty("store.class", "org.apache.qpid.server.store.MemoryMessageStore"); - _configuration.addProperty("housekeeping.expiredMessageCheckPeriod", "200"); + _configuration.setHousekeepingExpiredMessageCheckPeriod(200); Properties users = new Properties(); @@ -57,17 +60,18 @@ public class NullApplicationRegistry extends ApplicationRegistry _databaseManager = new PropertiesPrincipalDatabaseManager("default", users); - _accessManager = new ACLManager(_configuration, _pluginManager, AllowAll.FACTORY); + _accessManager = new ACLManager(_configuration.getSecurityConfiguration(), _pluginManager, AllowAll.FACTORY); _authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null); _managedObjectRegistry = new NoopManagedObjectRegistry(); _virtualHostRegistry = new VirtualHostRegistry(); - VirtualHost dummyHost = new VirtualHost("test", _configuration); + PropertiesConfiguration vhostProps = new PropertiesConfiguration(); + VirtualHostConfiguration hostConfig = new VirtualHostConfiguration("test", vhostProps); + VirtualHost dummyHost = new VirtualHost(hostConfig); _virtualHostRegistry.registerVirtualHost(dummyHost); _virtualHostRegistry.setDefaultVirtualHostName("test"); _pluginManager = new PluginManager(""); - _configuration.addProperty("heartbeat.delay", 10 * 60); // 10 minutes } diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 1497b4adb8..9a4a4c3286 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -20,35 +20,41 @@ */ package org.apache.qpid.server.virtualhost; +import java.util.Collections; +import java.util.List; import java.util.Timer; import java.util.TimerTask; import javax.management.NotCompliantMBeanException; -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.AMQBrokerManagerMBean; -import org.apache.qpid.server.routing.RoutingTable; -import org.apache.qpid.server.transactionlog.TransactionLog; -import org.apache.qpid.server.configuration.Configurator; +import org.apache.qpid.server.configuration.ExchangeConfiguration; +import org.apache.qpid.server.configuration.QueueConfiguration; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.connection.ConnectionRegistry; import org.apache.qpid.server.connection.IConnectionRegistry; import org.apache.qpid.server.exchange.DefaultExchangeFactory; import org.apache.qpid.server.exchange.DefaultExchangeRegistry; +import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.DefaultQueueRegistry; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.routing.RoutingTable; import org.apache.qpid.server.security.access.ACLManager; import org.apache.qpid.server.security.access.Accessable; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; +import org.apache.qpid.server.transactionlog.TransactionLog; public class VirtualHost implements Accessable { @@ -79,9 +85,6 @@ public class VirtualHost implements Accessable private final Timer _houseKeepingTimer; - private static final long DEFAULT_HOUSEKEEPING_PERIOD = 30000L; - - public void setAccessableName(String name) { _logger.warn("Setting Accessable Name for VirualHost is not allowed. (" @@ -133,46 +136,31 @@ public class VirtualHost implements Accessable } // End of MBean class /** - * Used for testing only - * @param name - * @param transactionLog - * @throws Exception - */ - public VirtualHost(String name, TransactionLog transactionLog) throws Exception - { - this(name, new PropertiesConfiguration(), transactionLog); - } - - /** * Normal Constructor * @param name * @param hostConfig * @throws Exception */ - public VirtualHost(String name, Configuration hostConfig) throws Exception + public VirtualHost(VirtualHostConfiguration hostConfig) throws Exception { - this(name, hostConfig, null); + this(hostConfig, null); } - public VirtualHost(String name, Configuration hostConfig, TransactionLog transactionLog) throws Exception + public VirtualHost(VirtualHostConfiguration hostConfig, TransactionLog transactionLog) throws Exception { - if (name == null || name.length() == 0) + _name = hostConfig.getName(); + + if (_name == null || _name.length() == 0) { - throw new IllegalArgumentException("Illegal name (" + name + ") for virtualhost."); + throw new IllegalArgumentException("Illegal name (" + _name + ") for virtualhost."); } - _name = name; - _virtualHostMBean = new VirtualHostMBean(); _connectionRegistry = new ConnectionRegistry(this); - _houseKeepingTimer = new Timer("Queue-housekeeping-"+name, true); - _queueRegistry = new DefaultQueueRegistry(this); - _exchangeFactory = new DefaultExchangeFactory(this); - _exchangeFactory.initialise(hostConfig); - _exchangeRegistry = new DefaultExchangeRegistry(this); - + _houseKeepingTimer = new Timer("Queue-housekeeping-"+_name, true); + if (transactionLog != null) { _transactionLog = transactionLog; @@ -183,33 +171,30 @@ public class VirtualHost implements Accessable } else { - if (hostConfig == null) - { - throw new IllegalAccessException("HostConfig and TransactionLog cannot be null"); - } initialiseTransactionLog(hostConfig); initialiseRoutingTable(hostConfig); } - - + _queueRegistry = new DefaultQueueRegistry(this); + _exchangeFactory = new DefaultExchangeFactory(this); + _exchangeFactory.initialise(hostConfig); + _exchangeRegistry = new DefaultExchangeRegistry(this); _exchangeRegistry.initialise(); - _authenticationManager = new PrincipalDatabaseAuthenticationManager(name, hostConfig); + initialiseModel(hostConfig); + + _authenticationManager = new PrincipalDatabaseAuthenticationManager(_name, hostConfig); _accessManager = ApplicationRegistry.getInstance().getAccessManager(); - _accessManager.configureHostPlugins(hostConfig); + _accessManager.configureHostPlugins(hostConfig.getSecurityConfiguration()); _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean); _brokerMBean.register(); - initialiseHouseKeeping(hostConfig); + initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod()); } - private void initialiseHouseKeeping(final Configuration hostConfig) + private void initialiseHouseKeeping(long period) { - - long period = hostConfig.getLong("housekeeping.expiredMessageCheckPeriod", DEFAULT_HOUSEKEEPING_PERIOD); - /* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */ if(period != 0L) { @@ -240,9 +225,9 @@ public class VirtualHost implements Accessable } //todo we need to move from store.class to transactionlog.class - private void initialiseTransactionLog(Configuration config) throws Exception + private void initialiseTransactionLog(VirtualHostConfiguration config) throws Exception { - String transactionLogClass = config.getString("store.class"); + String transactionLogClass = config.getTransactionLogClass(); Class clazz = Class.forName(transactionLogClass); Object o = clazz.newInstance(); @@ -257,9 +242,9 @@ public class VirtualHost implements Accessable } //todo we need to move from store.class to transactionlog.class - private void initialiseRoutingTable(Configuration config) throws Exception + private void initialiseRoutingTable(VirtualHostConfiguration hostConfig) throws Exception { - String transactionLogClass = config.getString("routingtable.class"); + String transactionLogClass = hostConfig.getRoutingTableClass(); if (transactionLogClass != null) { @@ -272,7 +257,7 @@ public class VirtualHost implements Accessable " does not."); } _routingTable = (RoutingTable) o; - _routingTable.configure(this, "routingtable", config); + _routingTable.configure(this, "routingtable", hostConfig); } else { @@ -282,24 +267,86 @@ public class VirtualHost implements Accessable } } } + + private void initialiseModel(VirtualHostConfiguration config) throws ConfigurationException, AMQException + { + _logger.debug("Loading configuration for virtualhost: "+config.getName()); + + List exchangeNames = config.getExchanges(); + + for(Object exchangeNameObj : exchangeNames) + { + String exchangeName = String.valueOf(exchangeNameObj); + configureExchange(config.getExchangeConfiguration(exchangeName)); + } + + String[] queueNames = config.getQueueNames(); + + for(Object queueNameObj : queueNames) + { + String queueName = String.valueOf(queueNameObj); + configureQueue(config.getQueueConfiguration(queueName)); + } + } + + private void configureExchange(ExchangeConfiguration exchangeConfiguration) throws AMQException + { + AMQShortString exchangeName = new AMQShortString(exchangeConfiguration.getName()); + + Exchange exchange; + exchange = _exchangeRegistry.getExchange(exchangeName); + if(exchange == null) + { + AMQShortString type = new AMQShortString(exchangeConfiguration.getType()); + boolean durable = exchangeConfiguration.getDurable(); + boolean autodelete = exchangeConfiguration.getAutoDelete(); + Exchange newExchange = _exchangeFactory.createExchange(exchangeName,type,durable,autodelete,0); + _exchangeRegistry.registerExchange(newExchange); + } + } - public <T> T getConfiguredObject(Class<T> instanceType, Configuration config) + private void configureQueue(QueueConfiguration queueConfiguration) throws AMQException, ConfigurationException { - T instance; - try + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueConfiguration, this); + + if (queue.isDurable()) + { + _routingTable.createQueue(queue); + } + + String exchangeName = queueConfiguration.getExchange(); + + Exchange exchange = _exchangeRegistry.getExchange(exchangeName == null ? null : new AMQShortString(exchangeName)); + + if(exchange == null) + { + exchange = _exchangeRegistry.getDefaultExchange(); + } + + if (exchange == null) + { + throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + exchangeName); + } + + List routingKeys = queueConfiguration.getRoutingKeys(); + if(routingKeys == null || routingKeys.isEmpty()) { - instance = instanceType.newInstance(); + routingKeys = Collections.singletonList(queue.getName()); } - catch (Exception e) + + for(Object routingKeyNameObj : routingKeys) { - _logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor"); - throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor", e); + AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj)); + queue.bind(exchange, routingKey, null); + _logger.info("Queue '" + queue.getName() + "' bound to exchange:" + exchangeName + " RK:'" + routingKey + "'"); } - Configurator.configure(instance); - return instance; + if(exchange != _exchangeRegistry.getDefaultExchange()) + { + queue.bind(_exchangeRegistry.getDefaultExchange(), queue.getName(), null); + } } public String getName() |