diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/Broker.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/Broker.java | 144 |
1 files changed, 90 insertions, 54 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Broker.java b/java/broker/src/main/java/org/apache/qpid/server/Broker.java index 5004d320c2..d58a0d5bb4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/Broker.java +++ b/java/broker/src/main/java/org/apache/qpid/server/Broker.java @@ -20,19 +20,22 @@ */ package org.apache.qpid.server; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.*; +import javax.net.ssl.SSLContext; import org.apache.log4j.Logger; import org.apache.log4j.PropertyConfigurator; -import org.apache.log4j.xml.QpidLog4JConfigurator; - import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.configuration.ServerNetworkTransportConfiguration; -import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean; -import org.apache.qpid.server.information.management.ServerInformationMBean; import org.apache.qpid.server.logging.SystemOutMessageLogger; import org.apache.qpid.server.logging.actors.BrokerActor; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; -import org.apache.qpid.server.logging.management.LoggingManagementMBean; +import org.apache.qpid.server.logging.log4j.LoggingFacade; import org.apache.qpid.server.logging.messages.BrokerMessages; import org.apache.qpid.server.protocol.AmqpProtocolVersion; import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory; @@ -46,30 +49,10 @@ import org.apache.qpid.transport.network.Transport; import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS; -import javax.net.ssl.SSLContext; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.EnumSet; -import java.util.Formatter; -import java.util.HashSet; -import java.util.List; -import java.util.Properties; -import java.util.Set; -import java.util.logging.ConsoleHandler; -import java.util.logging.FileHandler; -import java.util.logging.Handler; -import java.util.logging.Level; -import java.util.logging.LogRecord; - public class Broker { private static final Logger LOGGER = Logger.getLogger(Broker.class); - private static final int IPV4_ADDRESS_LENGTH = 4; - private static final char IPV4_LITERAL_SEPARATOR = '.'; private volatile Thread _shutdownHookThread; protected static class InitException extends RuntimeException @@ -128,6 +111,14 @@ public class Broker ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile, options.getBundleContext()); ServerConfiguration serverConfig = config.getConfiguration(); + if (options.getQpidWork() != null) + { + serverConfig.setQpidWork(options.getQpidWork()); + } + if (options.getQpidHome() != null) + { + serverConfig.setQpidHome(options.getQpidHome()); + } updateManagementPorts(serverConfig, options.getJmxPortRegistryServer(), options.getJmxPortConnectorServer()); ApplicationRegistry.initialise(config); @@ -145,14 +136,6 @@ public class Broker try { - configureLoggingManagementMBean(logConfigFile, options.getLogWatchFrequency()); - - ConfigurationManagementMBean configMBean = new ConfigurationManagementMBean(); - configMBean.register(); - - ServerInformationMBean sysInfoMBean = new ServerInformationMBean(config); - sysInfoMBean.register(); - Set<Integer> ports = new HashSet<Integer>(options.getPorts()); if(ports.isEmpty()) { @@ -165,36 +148,71 @@ public class Broker parsePortList(sslPorts, serverConfig.getSSLPorts()); } + //1-0 excludes and includes Set<Integer> exclude_1_0 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v1_0)); if(exclude_1_0.isEmpty()) { parsePortList(exclude_1_0, serverConfig.getPortExclude10()); } + Set<Integer> include_1_0 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v1_0)); + if(include_1_0.isEmpty()) + { + parsePortList(include_1_0, serverConfig.getPortInclude10()); + } + + //0-10 excludes and includes Set<Integer> exclude_0_10 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_10)); if(exclude_0_10.isEmpty()) { parsePortList(exclude_0_10, serverConfig.getPortExclude010()); } + Set<Integer> include_0_10 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v0_10)); + if(include_0_10.isEmpty()) + { + parsePortList(include_0_10, serverConfig.getPortInclude010()); + } + + //0-9-1 excludes and includes Set<Integer> exclude_0_9_1 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_9_1)); if(exclude_0_9_1.isEmpty()) { parsePortList(exclude_0_9_1, serverConfig.getPortExclude091()); } + Set<Integer> include_0_9_1 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v0_9_1)); + if(include_0_9_1.isEmpty()) + { + parsePortList(include_0_9_1, serverConfig.getPortInclude091()); + } + + //0-9 excludes and includes Set<Integer> exclude_0_9 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_9)); if(exclude_0_9.isEmpty()) { parsePortList(exclude_0_9, serverConfig.getPortExclude09()); } + Set<Integer> include_0_9 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v0_9)); + if(include_0_9.isEmpty()) + { + parsePortList(include_0_9, serverConfig.getPortInclude09()); + } + + //0-8 excludes and includes Set<Integer> exclude_0_8 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_8)); if(exclude_0_8.isEmpty()) { parsePortList(exclude_0_8, serverConfig.getPortExclude08()); } + Set<Integer> include_0_8 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v0_8)); + if(include_0_8.isEmpty()) + { + parsePortList(include_0_8, serverConfig.getPortInclude08()); + } + String bindAddr = options.getBind(); if (bindAddr == null) { @@ -220,8 +238,8 @@ public class Broker final InetSocketAddress inetSocketAddress = new InetSocketAddress(bindAddress, port); final Set<AmqpProtocolVersion> supported = - getSupportedVersions(port, exclude_1_0, exclude_0_10, exclude_0_9_1, exclude_0_9, - exclude_0_8, serverConfig); + getSupportedVersions(port, exclude_1_0, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8, + include_1_0, include_0_10, include_0_9_1, include_0_9, include_0_8,serverConfig); final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration(serverConfig, inetSocketAddress, Transport.TCP); @@ -233,7 +251,7 @@ public class Broker transport.accept(settings, protocolEngineFactory, null); ApplicationRegistry.getInstance().addAcceptor(inetSocketAddress, - new QpidAcceptor(transport,"TCP")); + new QpidAcceptor(transport,QpidAcceptor.Transport.TCP, supported)); CurrentActor.get().message(BrokerMessages.LISTENING("TCP", port)); } } @@ -242,16 +260,31 @@ public class Broker { final String keystorePath = serverConfig.getConnectorKeyStorePath(); final String keystorePassword = serverConfig.getConnectorKeyStorePassword(); + final String keystoreType = serverConfig.getConnectorKeyStoreType(); final String keyManagerFactoryAlgorithm = serverConfig.getConnectorKeyManagerFactoryAlgorithm(); - final SSLContext sslContext = SSLContextFactory.buildServerContext(keystorePath, keystorePassword, keyManagerFactoryAlgorithm); + final SSLContext sslContext; + if(serverConfig.getConnectorTrustStorePath()!=null) + { + sslContext = SSLContextFactory.buildClientContext(serverConfig.getConnectorTrustStorePath(), + serverConfig.getConnectorTrustStorePassword(), + serverConfig.getConnectorTrustStoreType(), + serverConfig.getConnectorTrustManagerFactoryAlgorithm(), + keystorePath, + keystorePassword, keystoreType, keyManagerFactoryAlgorithm, + serverConfig.getCertAlias()); + } + else + { + sslContext = SSLContextFactory.buildServerContext(keystorePath, keystorePassword, keystoreType, keyManagerFactoryAlgorithm); + } for(int sslPort : sslPorts) { final InetSocketAddress inetSocketAddress = new InetSocketAddress(bindAddress, sslPort); final Set<AmqpProtocolVersion> supported = - getSupportedVersions(sslPort, exclude_1_0, exclude_0_10, exclude_0_9_1, - exclude_0_9, exclude_0_8, serverConfig); + getSupportedVersions(sslPort, exclude_1_0, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8, + include_1_0, include_0_10, include_0_9_1, include_0_9, include_0_8, serverConfig); final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration(serverConfig, inetSocketAddress, Transport.TCP); @@ -262,7 +295,7 @@ public class Broker transport.accept(settings, protocolEngineFactory, sslContext); ApplicationRegistry.getInstance().addAcceptor(inetSocketAddress, - new QpidAcceptor(transport,"TCP")); + new QpidAcceptor(transport,QpidAcceptor.Transport.SSL, supported)); CurrentActor.get().message(BrokerMessages.LISTENING("TCP/SSL", sslPort)); } } @@ -282,27 +315,36 @@ public class Broker final Set<Integer> exclude_0_9_1, final Set<Integer> exclude_0_9, final Set<Integer> exclude_0_8, + final Set<Integer> include_1_0, + final Set<Integer> include_0_10, + final Set<Integer> include_0_9_1, + final Set<Integer> include_0_9, + final Set<Integer> include_0_8, final ServerConfiguration serverConfig) { final EnumSet<AmqpProtocolVersion> supported = EnumSet.allOf(AmqpProtocolVersion.class); - if(exclude_1_0.contains(port) || !serverConfig.isAmqp10enabled()) + if((exclude_1_0.contains(port) || !serverConfig.isAmqp10enabled()) && !include_1_0.contains(port)) { supported.remove(AmqpProtocolVersion.v1_0_0); } - if(exclude_0_10.contains(port) || !serverConfig.isAmqp010enabled()) + + if((exclude_0_10.contains(port) || !serverConfig.isAmqp010enabled()) && !include_0_10.contains(port)) { supported.remove(AmqpProtocolVersion.v0_10); } - if(exclude_0_9_1.contains(port) || !serverConfig.isAmqp091enabled()) + + if((exclude_0_9_1.contains(port) || !serverConfig.isAmqp091enabled()) && !include_0_9_1.contains(port)) { supported.remove(AmqpProtocolVersion.v0_9_1); } - if(exclude_0_9.contains(port) || !serverConfig.isAmqp09enabled()) + + if((exclude_0_9.contains(port) || !serverConfig.isAmqp09enabled()) && !include_0_9.contains(port)) { supported.remove(AmqpProtocolVersion.v0_9); } - if(exclude_0_8.contains(port) || !serverConfig.isAmqp08enabled()) + + if((exclude_0_8.contains(port) || !serverConfig.isAmqp08enabled()) && !include_0_8.contains(port)) { supported.remove(AmqpProtocolVersion.v0_8); } @@ -388,7 +430,7 @@ public class Broker } } - private void configureLogging(File logConfigFile, long logWatchTime) throws InitException, IOException + private void configureLogging(File logConfigFile, int logWatchTime) throws InitException, IOException { if (logConfigFile.exists() && logConfigFile.canRead()) { @@ -401,7 +443,7 @@ public class Broker // log4j expects the watch interval in milliseconds try { - QpidLog4JConfigurator.configureAndWatch(logConfigFile.getPath(), logWatchTime * 1000); + LoggingFacade.configureAndWatch(logConfigFile.getPath(), logWatchTime * 1000); } catch (Exception e) { @@ -412,7 +454,7 @@ public class Broker { try { - QpidLog4JConfigurator.configure(logConfigFile.getPath()); + LoggingFacade.configure(logConfigFile.getPath()); } catch (Exception e) { @@ -446,12 +488,6 @@ public class Broker } } - private void configureLoggingManagementMBean(File logConfigFile, int logWatchTime) throws Exception - { - LoggingManagementMBean blm = new LoggingManagementMBean(logConfigFile.getPath(),logWatchTime); - - blm.register(); - } private void addShutdownHook() { |