diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java | 117 |
1 files changed, 69 insertions, 48 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java index 78217c36ac..5dfd841434 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java @@ -33,6 +33,8 @@ import java.util.List; import java.util.Properties; import java.util.Set; +import javax.net.ssl.SSLContext; + import org.apache.log4j.PropertyConfigurator; import org.apache.log4j.xml.QpidLog4JConfigurator; import org.apache.qpid.server.configuration.ServerConfiguration; @@ -45,9 +47,8 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.management.LoggingManagementMBean; import org.apache.qpid.server.logging.messages.BrokerMessages; -import org.apache.qpid.server.protocol.AMQProtocolEngineFactory; -import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory; import org.apache.qpid.server.protocol.AmqpProtocolVersion; +import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; import org.apache.qpid.server.transport.QpidAcceptor; @@ -55,7 +56,6 @@ import org.apache.qpid.ssl.SSLContextFactory; import org.apache.qpid.transport.NetworkTransportConfiguration; import org.apache.qpid.transport.network.IncomingNetworkTransport; import org.apache.qpid.transport.network.Transport; -import org.apache.qpid.transport.network.mina.MinaNetworkTransport; public class Broker { @@ -82,7 +82,7 @@ public class Broker startup(new BrokerOptions()); } - public void startup(BrokerOptions options) throws Exception + public void startup(final BrokerOptions options) throws Exception { try { @@ -108,9 +108,9 @@ public class Broker configureLogging(logConfigFile, options.getLogWatchFrequency()); - ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile); + ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile, options.getBundleContext()); ServerConfiguration serverConfig = config.getConfiguration(); - updateManagementPort(serverConfig, options.getJmxPort()); + updateManagementPorts(serverConfig, options.getJmxPortRegistryServer(), options.getJmxPortConnectorServer()); ApplicationRegistry.initialise(config); @@ -192,57 +192,41 @@ public class Broker { for(int port : ports) { - Set<AmqpProtocolVersion> supported = EnumSet.allOf(AmqpProtocolVersion.class); - - if(exclude_0_10.contains(port)) - { - supported.remove(AmqpProtocolVersion.v0_10); - } - - if(exclude_0_9_1.contains(port)) - { - supported.remove(AmqpProtocolVersion.v0_9_1); - } - if(exclude_0_9.contains(port)) - { - supported.remove(AmqpProtocolVersion.v0_9); - } - if(exclude_0_8.contains(port)) - { - supported.remove(AmqpProtocolVersion.v0_8); - } - - NetworkTransportConfiguration settings = - new ServerNetworkTransportConfiguration(serverConfig, port, bindAddress.getHostName(), Transport.TCP); - - IncomingNetworkTransport transport = new MinaNetworkTransport(); - MultiVersionProtocolEngineFactory protocolEngineFactory = - new MultiVersionProtocolEngineFactory(hostName, supported); + final Set<AmqpProtocolVersion> supported = + getSupportedVersions(port, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8); + final NetworkTransportConfiguration settings = + new ServerNetworkTransportConfiguration(serverConfig, port, bindAddress.getHostName(), Transport.TCP); + + final IncomingNetworkTransport transport = Transport.getIncomingTransportInstance(); + final MultiVersionProtocolEngineFactory protocolEngineFactory = + new MultiVersionProtocolEngineFactory(hostName, supported); transport.accept(settings, protocolEngineFactory, null); ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port), - new QpidAcceptor(transport,"TCP")); + new QpidAcceptor(transport,"TCP")); CurrentActor.get().message(BrokerMessages.LISTENING("TCP", port)); } } if (serverConfig.getEnableSSL()) { - String keystorePath = serverConfig.getKeystorePath(); - String keystorePassword = serverConfig.getKeystorePassword(); - String certType = serverConfig.getCertType(); - SSLContextFactory sslFactory = - new SSLContextFactory(keystorePath, keystorePassword, certType); + final String keystorePath = serverConfig.getKeystorePath(); + final String keystorePassword = serverConfig.getKeystorePassword(); + final String certType = serverConfig.getCertType(); + final SSLContext sslContext = SSLContextFactory.buildServerContext(keystorePath, keystorePassword, certType); for(int sslPort : sslPorts) { - NetworkTransportConfiguration settings = + final Set<AmqpProtocolVersion> supported = + getSupportedVersions(sslPort, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8); + final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration(serverConfig, sslPort, bindAddress.getHostName(), Transport.TCP); - IncomingNetworkTransport transport = new MinaNetworkTransport(); - - transport.accept(settings, new AMQProtocolEngineFactory(), sslFactory); + final IncomingNetworkTransport transport = Transport.getIncomingTransportInstance(); + final MultiVersionProtocolEngineFactory protocolEngineFactory = + new MultiVersionProtocolEngineFactory(hostName, supported); + transport.accept(settings, protocolEngineFactory, sslContext); ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, sslPort), new QpidAcceptor(transport,"TCP")); CurrentActor.get().message(BrokerMessages.LISTENING("TCP/SSL", sslPort)); @@ -258,6 +242,32 @@ public class Broker } } + private static Set<AmqpProtocolVersion> getSupportedVersions(final int port, final Set<Integer> exclude_0_10, + final Set<Integer> exclude_0_9_1, final Set<Integer> exclude_0_9, + final Set<Integer> exclude_0_8) + { + final EnumSet<AmqpProtocolVersion> supported = EnumSet.allOf(AmqpProtocolVersion.class); + + if(exclude_0_10.contains(port)) + { + supported.remove(AmqpProtocolVersion.v0_10); + } + if(exclude_0_9_1.contains(port)) + { + supported.remove(AmqpProtocolVersion.v0_9_1); + } + if(exclude_0_9.contains(port)) + { + supported.remove(AmqpProtocolVersion.v0_9); + } + if(exclude_0_8.contains(port)) + { + supported.remove(AmqpProtocolVersion.v0_8); + } + + return supported; + } + private File getConfigFile(final String fileName, final String defaultFileName, final String qpidHome, boolean throwOnFileNotFound) throws InitException @@ -308,19 +318,30 @@ public class Broker /** * Update the configuration data with the management port. * @param configuration - * @param managementPort The string from the command line + * @param registryServerPort The string from the command line */ - private void updateManagementPort(ServerConfiguration configuration, Integer managementPort) + private void updateManagementPorts(ServerConfiguration configuration, Integer registryServerPort, Integer connectorServerPort) { - if (managementPort != null) + if (registryServerPort != null) + { + try + { + configuration.setJMXPortRegistryServer(registryServerPort); + } + catch (NumberFormatException e) + { + throw new InitException("Invalid management (registry server) port: " + registryServerPort, null); + } + } + if (connectorServerPort != null) { try { - configuration.setJMXManagementPort(managementPort); + configuration.setJMXPortConnectorServer(connectorServerPort); } catch (NumberFormatException e) { - throw new InitException("Invalid management port: " + managementPort, null); + throw new InitException("Invalid management (connector server) port: " + connectorServerPort, null); } } } @@ -366,7 +387,7 @@ public class Broker // log4j expects the watch interval in milliseconds try { - QpidLog4JConfigurator.configureAndWatch(logConfigFile.getPath(), logWatchTime); + QpidLog4JConfigurator.configureAndWatch(logConfigFile.getPath(), logWatchTime * 1000); } catch (Exception e) { |