summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java117
1 files changed, 69 insertions, 48 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java
index 78217c36ac..5dfd841434 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java
@@ -33,6 +33,8 @@ import java.util.List;
import java.util.Properties;
import java.util.Set;
+import javax.net.ssl.SSLContext;
+
import org.apache.log4j.PropertyConfigurator;
import org.apache.log4j.xml.QpidLog4JConfigurator;
import org.apache.qpid.server.configuration.ServerConfiguration;
@@ -45,9 +47,8 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.management.LoggingManagementMBean;
import org.apache.qpid.server.logging.messages.BrokerMessages;
-import org.apache.qpid.server.protocol.AMQProtocolEngineFactory;
-import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
import org.apache.qpid.server.protocol.AmqpProtocolVersion;
+import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
import org.apache.qpid.server.transport.QpidAcceptor;
@@ -55,7 +56,6 @@ import org.apache.qpid.ssl.SSLContextFactory;
import org.apache.qpid.transport.NetworkTransportConfiguration;
import org.apache.qpid.transport.network.IncomingNetworkTransport;
import org.apache.qpid.transport.network.Transport;
-import org.apache.qpid.transport.network.mina.MinaNetworkTransport;
public class Broker
{
@@ -82,7 +82,7 @@ public class Broker
startup(new BrokerOptions());
}
- public void startup(BrokerOptions options) throws Exception
+ public void startup(final BrokerOptions options) throws Exception
{
try
{
@@ -108,9 +108,9 @@ public class Broker
configureLogging(logConfigFile, options.getLogWatchFrequency());
- ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile);
+ ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile, options.getBundleContext());
ServerConfiguration serverConfig = config.getConfiguration();
- updateManagementPort(serverConfig, options.getJmxPort());
+ updateManagementPorts(serverConfig, options.getJmxPortRegistryServer(), options.getJmxPortConnectorServer());
ApplicationRegistry.initialise(config);
@@ -192,57 +192,41 @@ public class Broker
{
for(int port : ports)
{
- Set<AmqpProtocolVersion> supported = EnumSet.allOf(AmqpProtocolVersion.class);
-
- if(exclude_0_10.contains(port))
- {
- supported.remove(AmqpProtocolVersion.v0_10);
- }
-
- if(exclude_0_9_1.contains(port))
- {
- supported.remove(AmqpProtocolVersion.v0_9_1);
- }
- if(exclude_0_9.contains(port))
- {
- supported.remove(AmqpProtocolVersion.v0_9);
- }
- if(exclude_0_8.contains(port))
- {
- supported.remove(AmqpProtocolVersion.v0_8);
- }
-
- NetworkTransportConfiguration settings =
- new ServerNetworkTransportConfiguration(serverConfig, port, bindAddress.getHostName(), Transport.TCP);
-
- IncomingNetworkTransport transport = new MinaNetworkTransport();
- MultiVersionProtocolEngineFactory protocolEngineFactory =
- new MultiVersionProtocolEngineFactory(hostName, supported);
+ final Set<AmqpProtocolVersion> supported =
+ getSupportedVersions(port, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8);
+ final NetworkTransportConfiguration settings =
+ new ServerNetworkTransportConfiguration(serverConfig, port, bindAddress.getHostName(), Transport.TCP);
+
+ final IncomingNetworkTransport transport = Transport.getIncomingTransportInstance();
+ final MultiVersionProtocolEngineFactory protocolEngineFactory =
+ new MultiVersionProtocolEngineFactory(hostName, supported);
transport.accept(settings, protocolEngineFactory, null);
ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port),
- new QpidAcceptor(transport,"TCP"));
+ new QpidAcceptor(transport,"TCP"));
CurrentActor.get().message(BrokerMessages.LISTENING("TCP", port));
}
}
if (serverConfig.getEnableSSL())
{
- String keystorePath = serverConfig.getKeystorePath();
- String keystorePassword = serverConfig.getKeystorePassword();
- String certType = serverConfig.getCertType();
- SSLContextFactory sslFactory =
- new SSLContextFactory(keystorePath, keystorePassword, certType);
+ final String keystorePath = serverConfig.getKeystorePath();
+ final String keystorePassword = serverConfig.getKeystorePassword();
+ final String certType = serverConfig.getCertType();
+ final SSLContext sslContext = SSLContextFactory.buildServerContext(keystorePath, keystorePassword, certType);
for(int sslPort : sslPorts)
{
- NetworkTransportConfiguration settings =
+ final Set<AmqpProtocolVersion> supported =
+ getSupportedVersions(sslPort, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8);
+ final NetworkTransportConfiguration settings =
new ServerNetworkTransportConfiguration(serverConfig, sslPort, bindAddress.getHostName(), Transport.TCP);
- IncomingNetworkTransport transport = new MinaNetworkTransport();
-
- transport.accept(settings, new AMQProtocolEngineFactory(), sslFactory);
+ final IncomingNetworkTransport transport = Transport.getIncomingTransportInstance();
+ final MultiVersionProtocolEngineFactory protocolEngineFactory =
+ new MultiVersionProtocolEngineFactory(hostName, supported);
+ transport.accept(settings, protocolEngineFactory, sslContext);
ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, sslPort),
new QpidAcceptor(transport,"TCP"));
CurrentActor.get().message(BrokerMessages.LISTENING("TCP/SSL", sslPort));
@@ -258,6 +242,32 @@ public class Broker
}
}
+ private static Set<AmqpProtocolVersion> getSupportedVersions(final int port, final Set<Integer> exclude_0_10,
+ final Set<Integer> exclude_0_9_1, final Set<Integer> exclude_0_9,
+ final Set<Integer> exclude_0_8)
+ {
+ final EnumSet<AmqpProtocolVersion> supported = EnumSet.allOf(AmqpProtocolVersion.class);
+
+ if(exclude_0_10.contains(port))
+ {
+ supported.remove(AmqpProtocolVersion.v0_10);
+ }
+ if(exclude_0_9_1.contains(port))
+ {
+ supported.remove(AmqpProtocolVersion.v0_9_1);
+ }
+ if(exclude_0_9.contains(port))
+ {
+ supported.remove(AmqpProtocolVersion.v0_9);
+ }
+ if(exclude_0_8.contains(port))
+ {
+ supported.remove(AmqpProtocolVersion.v0_8);
+ }
+
+ return supported;
+ }
+
private File getConfigFile(final String fileName,
final String defaultFileName,
final String qpidHome, boolean throwOnFileNotFound) throws InitException
@@ -308,19 +318,30 @@ public class Broker
/**
* Update the configuration data with the management port.
* @param configuration
- * @param managementPort The string from the command line
+ * @param registryServerPort The string from the command line
*/
- private void updateManagementPort(ServerConfiguration configuration, Integer managementPort)
+ private void updateManagementPorts(ServerConfiguration configuration, Integer registryServerPort, Integer connectorServerPort)
{
- if (managementPort != null)
+ if (registryServerPort != null)
+ {
+ try
+ {
+ configuration.setJMXPortRegistryServer(registryServerPort);
+ }
+ catch (NumberFormatException e)
+ {
+ throw new InitException("Invalid management (registry server) port: " + registryServerPort, null);
+ }
+ }
+ if (connectorServerPort != null)
{
try
{
- configuration.setJMXManagementPort(managementPort);
+ configuration.setJMXPortConnectorServer(connectorServerPort);
}
catch (NumberFormatException e)
{
- throw new InitException("Invalid management port: " + managementPort, null);
+ throw new InitException("Invalid management (connector server) port: " + connectorServerPort, null);
}
}
}
@@ -366,7 +387,7 @@ public class Broker
// log4j expects the watch interval in milliseconds
try
{
- QpidLog4JConfigurator.configureAndWatch(logConfigFile.getPath(), logWatchTime);
+ QpidLog4JConfigurator.configureAndWatch(logConfigFile.getPath(), logWatchTime * 1000);
}
catch (Exception e)
{