diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/Main.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/Main.java | 158 |
1 files changed, 44 insertions, 114 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java index f3b54034e7..780a17940e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -20,6 +20,12 @@ */ package org.apache.qpid.server; +import java.io.File; +import java.io.IOException; +import java.net.BindException; +import java.net.InetAddress; +import java.net.InetSocketAddress; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Option; @@ -27,36 +33,23 @@ import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.cli.PosixParser; -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.BasicConfigurator; import org.apache.log4j.Logger; import org.apache.log4j.xml.DOMConfigurator; import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.IoAcceptor; import org.apache.mina.common.FixedSizeByteBufferAllocator; +import org.apache.mina.common.IoAcceptor; import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; -import org.apache.qpid.AMQException; +import org.apache.mina.util.NewThreadExecutor; import org.apache.qpid.common.QpidProperties; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.pool.ReadWriteThreadModel; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; -import org.apache.qpid.server.management.JMXManagedObjectRegistry; +import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.protocol.AMQPFastProtocolHandler; import org.apache.qpid.server.protocol.AMQPProtocolProvider; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; -import org.apache.qpid.server.transport.ConnectorConfiguration; -import org.apache.qpid.url.URLSyntaxException; - -import java.io.File; -import java.io.IOException; -import java.net.BindException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.Collection; -import java.util.List; /** * Main entry point for AMQPD. @@ -200,13 +193,6 @@ public class Main _brokerLogger.error("Initialisation Error : " + e.getMessage()); shutdown(1); } - catch (ConfigurationException e) - { - System.out.println("Error configuring message broker: " + e); - _brokerLogger.error("Error configuring message broker: " + e); - e.printStackTrace(); - shutdown(1); - } catch (Throwable e) { System.out.println("Error initialising message broker: " + e); @@ -223,7 +209,7 @@ public class Main System.exit(status); } - protected void startup() throws InitException, ConfigurationException, Exception + protected void startup() throws Exception { final String QpidHome = System.getProperty(QPID_HOME); final File defaultConfigFile = new File(QpidHome, DEFAULT_CONFIG_FILE); @@ -259,40 +245,32 @@ public class Main } ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile); - - - updateManagementPort(config.getConfiguration(), commandLine.getOptionValue("m")); - - + ServerConfiguration serverConfig = config.getConfiguration(); + updateManagementPort(serverConfig, commandLine.getOptionValue("m")); ApplicationRegistry.initialise(config); - //fixme .. use QpidProperties.getVersionString when we have fixed the classpath issues // that are causing the broker build to pick up the wrong properties file and hence say // Starting Qpid Client _brokerLogger.info("Starting Qpid Broker " + QpidProperties.getReleaseVersion() + " build: " + QpidProperties.getBuildVersion()); - ConnectorConfiguration connectorConfig = - ApplicationRegistry.getInstance().getConfiguredObject(ConnectorConfiguration.class); - - ByteBuffer.setUseDirectBuffers(connectorConfig.enableDirectBuffers); + ByteBuffer.setUseDirectBuffers(serverConfig.getEnableDirectBuffers()); // the MINA default is currently to use the pooled allocator although this may change in future // once more testing of the performance of the simple allocator has been done - if (!connectorConfig.enablePooledAllocator) + if (!serverConfig.getEnablePooledAllocator()) { ByteBuffer.setAllocator(new FixedSizeByteBufferAllocator()); } - - if(connectorConfig.useBiasedWrites) + if(serverConfig.getUseBiasedWrites()) { System.setProperty("org.apache.qpid.use_write_biased_pool","true"); } - int port = connectorConfig.port; + int port = serverConfig.getPort(); String portStr = commandLine.getOptionValue("p"); if (portStr != null) @@ -306,29 +284,8 @@ public class Main throw new InitException("Invalid port: " + portStr, e); } } - - String VIRTUAL_HOSTS = "virtualhosts"; - - Object virtualHosts = ApplicationRegistry.getInstance().getConfiguration().getProperty(VIRTUAL_HOSTS); - - if (virtualHosts != null) - { - if (virtualHosts instanceof Collection) - { - int totalVHosts = ((Collection) virtualHosts).size(); - for (int vhost = 0; vhost < totalVHosts; vhost++) - { - setupVirtualHosts(configFile.getParent(), (String) ((List) virtualHosts).get(vhost)); - } - } - else - { - setupVirtualHosts(configFile.getParent(), (String) virtualHosts); - } - } - - bind(port, connectorConfig); - + + bind(port, serverConfig); } /** @@ -336,86 +293,59 @@ public class Main * @param configuration * @param managementPort The string from the command line */ - private void updateManagementPort(Configuration configuration, String managementPort) + private void updateManagementPort(ServerConfiguration configuration, String managementPort) { if (managementPort != null) { - int mport; - int defaultMPort = configuration.getInt(JMXManagedObjectRegistry.MANAGEMENT_PORT_CONFIG_PATH); try { - mport = Integer.parseInt(managementPort); - configuration.setProperty(JMXManagedObjectRegistry.MANAGEMENT_PORT_CONFIG_PATH, mport); + configuration.setJMXManagementPort(Integer.parseInt(managementPort)); } catch (NumberFormatException e) { - _logger.warn("Invalid management port: " + managementPort + " will use default:" + defaultMPort, e); + _logger.warn("Invalid management port: " + managementPort + " will use:" + configuration.getJMXManagementPort(), e); } } } - protected void setupVirtualHosts(String configFileParent, String configFilePath) - throws ConfigurationException, AMQException, URLSyntaxException - { - String configVar = "${conf}"; - - if (configFilePath.startsWith(configVar)) - { - configFilePath = configFileParent + configFilePath.substring(configVar.length()); - } - - if (configFilePath.indexOf(".xml") != -1) - { - VirtualHostConfiguration vHostConfig = new VirtualHostConfiguration(configFilePath); - vHostConfig.performBindings(); - } - else - { - // the virtualhosts value is a path. Search it for XML files. - - File virtualHostDir = new File(configFilePath); - - String[] fileNames = virtualHostDir.list(); - - for (int each = 0; each < fileNames.length; each++) - { - if (fileNames[each].endsWith(".xml")) - { - VirtualHostConfiguration vHostConfig = - new VirtualHostConfiguration(configFilePath + "/" + fileNames[each]); - vHostConfig.performBindings(); - } - } - } - } - - protected void bind(int port, ConnectorConfiguration connectorConfig) throws BindException + protected void bind(int port, ServerConfiguration config) throws BindException { String bindAddr = commandLine.getOptionValue("b"); if (bindAddr == null) { - bindAddr = connectorConfig.bindAddress; + bindAddr = config.getBind(); } try { - // IoAcceptor acceptor = new SocketAcceptor(connectorConfig.processors); - IoAcceptor acceptor = connectorConfig.createAcceptor(); + IoAcceptor acceptor; + + if (ApplicationRegistry.getInstance().getConfiguration().getQpidNIO()) + { + _logger.warn("Using Qpid Multithreaded IO Processing"); + acceptor = new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(config.getProcessors(), new NewThreadExecutor()); + } + else + { + _logger.warn("Using Mina IO Processing"); + acceptor = new org.apache.mina.transport.socket.nio.SocketAcceptor(config.getProcessors(), new NewThreadExecutor()); + } + SocketAcceptorConfig sconfig = (SocketAcceptorConfig) acceptor.getDefaultConfig(); SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig(); - sc.setReceiveBufferSize(connectorConfig.socketReceiveBufferSize); - sc.setSendBufferSize(connectorConfig.socketWriteBuferSize); - sc.setTcpNoDelay(connectorConfig.tcpNoDelay); + sc.setReceiveBufferSize(config.getReceiveBufferSize()); + sc.setSendBufferSize(config.getWriteBufferSize()); + sc.setTcpNoDelay(config.getTcpNoDelay()); // if we do not use the executor pool threading model we get the default leader follower // implementation provided by MINA - if (connectorConfig.enableExecutorPool) + if (config.getEnableExecutorPool()) { sconfig.setThreadModel(ReadWriteThreadModel.getInstance()); } - if (!connectorConfig.enableSSL || !connectorConfig.sslOnly) + if (!config.getEnableSSL() || !config.getSSLOnly()) { AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler(); InetSocketAddress bindAddress; @@ -434,16 +364,16 @@ public class Main _brokerLogger.info("Qpid.AMQP listening on non-SSL address " + bindAddress); } - if (connectorConfig.enableSSL) + if (config.getEnableSSL()) { AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler(); try { - bind(acceptor, new InetSocketAddress(connectorConfig.sslPort), handler, sconfig); + bind(acceptor, new InetSocketAddress(config.getSSLPort()), handler, sconfig); //fixme qpid.AMQP should be using qpidproperties to get value - _brokerLogger.info("Qpid.AMQP listening on SSL port " + connectorConfig.sslPort); + _brokerLogger.info("Qpid.AMQP listening on SSL port " + config.getSSLPort()); } catch (IOException e) |