summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/Broker.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/Broker.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/Broker.java144
1 files changed, 90 insertions, 54 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Broker.java b/java/broker/src/main/java/org/apache/qpid/server/Broker.java
index 5004d320c2..d58a0d5bb4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/Broker.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/Broker.java
@@ -20,19 +20,22 @@
*/
package org.apache.qpid.server;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.*;
+import javax.net.ssl.SSLContext;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
-import org.apache.log4j.xml.QpidLog4JConfigurator;
-
import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.configuration.ServerNetworkTransportConfiguration;
-import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean;
-import org.apache.qpid.server.information.management.ServerInformationMBean;
import org.apache.qpid.server.logging.SystemOutMessageLogger;
import org.apache.qpid.server.logging.actors.BrokerActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
-import org.apache.qpid.server.logging.management.LoggingManagementMBean;
+import org.apache.qpid.server.logging.log4j.LoggingFacade;
import org.apache.qpid.server.logging.messages.BrokerMessages;
import org.apache.qpid.server.protocol.AmqpProtocolVersion;
import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
@@ -46,30 +49,10 @@ import org.apache.qpid.transport.network.Transport;
import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
-import javax.net.ssl.SSLContext;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.EnumSet;
-import java.util.Formatter;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-import java.util.logging.ConsoleHandler;
-import java.util.logging.FileHandler;
-import java.util.logging.Handler;
-import java.util.logging.Level;
-import java.util.logging.LogRecord;
-
public class Broker
{
private static final Logger LOGGER = Logger.getLogger(Broker.class);
- private static final int IPV4_ADDRESS_LENGTH = 4;
- private static final char IPV4_LITERAL_SEPARATOR = '.';
private volatile Thread _shutdownHookThread;
protected static class InitException extends RuntimeException
@@ -128,6 +111,14 @@ public class Broker
ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile, options.getBundleContext());
ServerConfiguration serverConfig = config.getConfiguration();
+ if (options.getQpidWork() != null)
+ {
+ serverConfig.setQpidWork(options.getQpidWork());
+ }
+ if (options.getQpidHome() != null)
+ {
+ serverConfig.setQpidHome(options.getQpidHome());
+ }
updateManagementPorts(serverConfig, options.getJmxPortRegistryServer(), options.getJmxPortConnectorServer());
ApplicationRegistry.initialise(config);
@@ -145,14 +136,6 @@ public class Broker
try
{
- configureLoggingManagementMBean(logConfigFile, options.getLogWatchFrequency());
-
- ConfigurationManagementMBean configMBean = new ConfigurationManagementMBean();
- configMBean.register();
-
- ServerInformationMBean sysInfoMBean = new ServerInformationMBean(config);
- sysInfoMBean.register();
-
Set<Integer> ports = new HashSet<Integer>(options.getPorts());
if(ports.isEmpty())
{
@@ -165,36 +148,71 @@ public class Broker
parsePortList(sslPorts, serverConfig.getSSLPorts());
}
+ //1-0 excludes and includes
Set<Integer> exclude_1_0 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v1_0));
if(exclude_1_0.isEmpty())
{
parsePortList(exclude_1_0, serverConfig.getPortExclude10());
}
+ Set<Integer> include_1_0 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v1_0));
+ if(include_1_0.isEmpty())
+ {
+ parsePortList(include_1_0, serverConfig.getPortInclude10());
+ }
+
+ //0-10 excludes and includes
Set<Integer> exclude_0_10 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_10));
if(exclude_0_10.isEmpty())
{
parsePortList(exclude_0_10, serverConfig.getPortExclude010());
}
+ Set<Integer> include_0_10 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v0_10));
+ if(include_0_10.isEmpty())
+ {
+ parsePortList(include_0_10, serverConfig.getPortInclude010());
+ }
+
+ //0-9-1 excludes and includes
Set<Integer> exclude_0_9_1 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_9_1));
if(exclude_0_9_1.isEmpty())
{
parsePortList(exclude_0_9_1, serverConfig.getPortExclude091());
}
+ Set<Integer> include_0_9_1 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v0_9_1));
+ if(include_0_9_1.isEmpty())
+ {
+ parsePortList(include_0_9_1, serverConfig.getPortInclude091());
+ }
+
+ //0-9 excludes and includes
Set<Integer> exclude_0_9 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_9));
if(exclude_0_9.isEmpty())
{
parsePortList(exclude_0_9, serverConfig.getPortExclude09());
}
+ Set<Integer> include_0_9 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v0_9));
+ if(include_0_9.isEmpty())
+ {
+ parsePortList(include_0_9, serverConfig.getPortInclude09());
+ }
+
+ //0-8 excludes and includes
Set<Integer> exclude_0_8 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_8));
if(exclude_0_8.isEmpty())
{
parsePortList(exclude_0_8, serverConfig.getPortExclude08());
}
+ Set<Integer> include_0_8 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v0_8));
+ if(include_0_8.isEmpty())
+ {
+ parsePortList(include_0_8, serverConfig.getPortInclude08());
+ }
+
String bindAddr = options.getBind();
if (bindAddr == null)
{
@@ -220,8 +238,8 @@ public class Broker
final InetSocketAddress inetSocketAddress = new InetSocketAddress(bindAddress, port);
final Set<AmqpProtocolVersion> supported =
- getSupportedVersions(port, exclude_1_0, exclude_0_10, exclude_0_9_1, exclude_0_9,
- exclude_0_8, serverConfig);
+ getSupportedVersions(port, exclude_1_0, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8,
+ include_1_0, include_0_10, include_0_9_1, include_0_9, include_0_8,serverConfig);
final NetworkTransportConfiguration settings =
new ServerNetworkTransportConfiguration(serverConfig, inetSocketAddress, Transport.TCP);
@@ -233,7 +251,7 @@ public class Broker
transport.accept(settings, protocolEngineFactory, null);
ApplicationRegistry.getInstance().addAcceptor(inetSocketAddress,
- new QpidAcceptor(transport,"TCP"));
+ new QpidAcceptor(transport,QpidAcceptor.Transport.TCP, supported));
CurrentActor.get().message(BrokerMessages.LISTENING("TCP", port));
}
}
@@ -242,16 +260,31 @@ public class Broker
{
final String keystorePath = serverConfig.getConnectorKeyStorePath();
final String keystorePassword = serverConfig.getConnectorKeyStorePassword();
+ final String keystoreType = serverConfig.getConnectorKeyStoreType();
final String keyManagerFactoryAlgorithm = serverConfig.getConnectorKeyManagerFactoryAlgorithm();
- final SSLContext sslContext = SSLContextFactory.buildServerContext(keystorePath, keystorePassword, keyManagerFactoryAlgorithm);
+ final SSLContext sslContext;
+ if(serverConfig.getConnectorTrustStorePath()!=null)
+ {
+ sslContext = SSLContextFactory.buildClientContext(serverConfig.getConnectorTrustStorePath(),
+ serverConfig.getConnectorTrustStorePassword(),
+ serverConfig.getConnectorTrustStoreType(),
+ serverConfig.getConnectorTrustManagerFactoryAlgorithm(),
+ keystorePath,
+ keystorePassword, keystoreType, keyManagerFactoryAlgorithm,
+ serverConfig.getCertAlias());
+ }
+ else
+ {
+ sslContext = SSLContextFactory.buildServerContext(keystorePath, keystorePassword, keystoreType, keyManagerFactoryAlgorithm);
+ }
for(int sslPort : sslPorts)
{
final InetSocketAddress inetSocketAddress = new InetSocketAddress(bindAddress, sslPort);
final Set<AmqpProtocolVersion> supported =
- getSupportedVersions(sslPort, exclude_1_0, exclude_0_10, exclude_0_9_1,
- exclude_0_9, exclude_0_8, serverConfig);
+ getSupportedVersions(sslPort, exclude_1_0, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8,
+ include_1_0, include_0_10, include_0_9_1, include_0_9, include_0_8, serverConfig);
final NetworkTransportConfiguration settings =
new ServerNetworkTransportConfiguration(serverConfig, inetSocketAddress, Transport.TCP);
@@ -262,7 +295,7 @@ public class Broker
transport.accept(settings, protocolEngineFactory, sslContext);
ApplicationRegistry.getInstance().addAcceptor(inetSocketAddress,
- new QpidAcceptor(transport,"TCP"));
+ new QpidAcceptor(transport,QpidAcceptor.Transport.SSL, supported));
CurrentActor.get().message(BrokerMessages.LISTENING("TCP/SSL", sslPort));
}
}
@@ -282,27 +315,36 @@ public class Broker
final Set<Integer> exclude_0_9_1,
final Set<Integer> exclude_0_9,
final Set<Integer> exclude_0_8,
+ final Set<Integer> include_1_0,
+ final Set<Integer> include_0_10,
+ final Set<Integer> include_0_9_1,
+ final Set<Integer> include_0_9,
+ final Set<Integer> include_0_8,
final ServerConfiguration serverConfig)
{
final EnumSet<AmqpProtocolVersion> supported = EnumSet.allOf(AmqpProtocolVersion.class);
- if(exclude_1_0.contains(port) || !serverConfig.isAmqp10enabled())
+ if((exclude_1_0.contains(port) || !serverConfig.isAmqp10enabled()) && !include_1_0.contains(port))
{
supported.remove(AmqpProtocolVersion.v1_0_0);
}
- if(exclude_0_10.contains(port) || !serverConfig.isAmqp010enabled())
+
+ if((exclude_0_10.contains(port) || !serverConfig.isAmqp010enabled()) && !include_0_10.contains(port))
{
supported.remove(AmqpProtocolVersion.v0_10);
}
- if(exclude_0_9_1.contains(port) || !serverConfig.isAmqp091enabled())
+
+ if((exclude_0_9_1.contains(port) || !serverConfig.isAmqp091enabled()) && !include_0_9_1.contains(port))
{
supported.remove(AmqpProtocolVersion.v0_9_1);
}
- if(exclude_0_9.contains(port) || !serverConfig.isAmqp09enabled())
+
+ if((exclude_0_9.contains(port) || !serverConfig.isAmqp09enabled()) && !include_0_9.contains(port))
{
supported.remove(AmqpProtocolVersion.v0_9);
}
- if(exclude_0_8.contains(port) || !serverConfig.isAmqp08enabled())
+
+ if((exclude_0_8.contains(port) || !serverConfig.isAmqp08enabled()) && !include_0_8.contains(port))
{
supported.remove(AmqpProtocolVersion.v0_8);
}
@@ -388,7 +430,7 @@ public class Broker
}
}
- private void configureLogging(File logConfigFile, long logWatchTime) throws InitException, IOException
+ private void configureLogging(File logConfigFile, int logWatchTime) throws InitException, IOException
{
if (logConfigFile.exists() && logConfigFile.canRead())
{
@@ -401,7 +443,7 @@ public class Broker
// log4j expects the watch interval in milliseconds
try
{
- QpidLog4JConfigurator.configureAndWatch(logConfigFile.getPath(), logWatchTime * 1000);
+ LoggingFacade.configureAndWatch(logConfigFile.getPath(), logWatchTime * 1000);
}
catch (Exception e)
{
@@ -412,7 +454,7 @@ public class Broker
{
try
{
- QpidLog4JConfigurator.configure(logConfigFile.getPath());
+ LoggingFacade.configure(logConfigFile.getPath());
}
catch (Exception e)
{
@@ -446,12 +488,6 @@ public class Broker
}
}
- private void configureLoggingManagementMBean(File logConfigFile, int logWatchTime) throws Exception
- {
- LoggingManagementMBean blm = new LoggingManagementMBean(logConfigFile.getPath(),logWatchTime);
-
- blm.register();
- }
private void addShutdownHook()
{