diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java | 181 |
1 files changed, 116 insertions, 65 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java index e3d8747d72..da26fe1fc4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java @@ -20,24 +20,10 @@ */ package org.apache.qpid.server; -import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.EnumSet; -import java.util.HashSet; -import java.util.List; -import java.util.Properties; -import java.util.Set; -import java.util.logging.*; - -import javax.net.ssl.SSLContext; - +import org.apache.log4j.Logger; import org.apache.log4j.PropertyConfigurator; import org.apache.log4j.xml.QpidLog4JConfigurator; + import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.configuration.ServerNetworkTransportConfiguration; import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean; @@ -58,10 +44,33 @@ import org.apache.qpid.transport.NetworkTransportConfiguration; import org.apache.qpid.transport.network.IncomingNetworkTransport; import org.apache.qpid.transport.network.Transport; +import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS; + +import javax.net.ssl.SSLContext; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.logging.ConsoleHandler; +import java.util.logging.FileHandler; +import java.util.logging.Formatter; +import java.util.logging.Handler; +import java.util.logging.Level; +import java.util.logging.LogRecord; + public class Broker { + private static final Logger LOGGER = Logger.getLogger(Broker.class); + private static final int IPV4_ADDRESS_LENGTH = 4; private static final char IPV4_LITERAL_SEPARATOR = '.'; + private volatile Thread _shutdownHookThread; private java.util.logging.Logger FRAME_LOGGER; private java.util.logging.Logger RAW_LOGGER; @@ -79,7 +88,14 @@ public class Broker public void shutdown() { - ApplicationRegistry.remove(); + try + { + removeShutdownHook(); + } + finally + { + ApplicationRegistry.remove(); + } } public void startup() throws Exception @@ -93,6 +109,7 @@ public class Broker { CurrentActor.set(new BrokerActor(new SystemOutMessageLogger())); startupImpl(options); + addShutdownHook(); } finally { @@ -185,32 +202,37 @@ public class Broker bindAddr = serverConfig.getBind(); } - InetAddress bindAddress = null; + InetAddress bindAddress; if (bindAddr.equals(WILDCARD_ADDRESS)) { - bindAddress = new InetSocketAddress(0).getAddress(); + bindAddress = null; } else { - bindAddress = InetAddress.getByAddress(parseIP(bindAddr)); + bindAddress = InetAddress.getByName(bindAddr); } - String hostName = bindAddress.getCanonicalHostName(); + + final AmqpProtocolVersion defaultSupportedProtocolReply = serverConfig.getDefaultSupportedProtocolReply(); if (!serverConfig.getSSLOnly()) { for(int port : ports) { + final InetSocketAddress inetSocketAddress = new InetSocketAddress(bindAddress, port); + final Set<AmqpProtocolVersion> supported = - getSupportedVersions(port, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8); + getSupportedVersions(port, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8, serverConfig); + final NetworkTransportConfiguration settings = - new ServerNetworkTransportConfiguration(serverConfig, port, bindAddress.getHostName(), Transport.TCP); + new ServerNetworkTransportConfiguration(serverConfig, inetSocketAddress, Transport.TCP); final IncomingNetworkTransport transport = Transport.getIncomingTransportInstance(); final MultiVersionProtocolEngineFactory protocolEngineFactory = - new MultiVersionProtocolEngineFactory(hostName, supported); + new MultiVersionProtocolEngineFactory(supported, defaultSupportedProtocolReply); transport.accept(settings, protocolEngineFactory, null); - ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port), + + ApplicationRegistry.getInstance().addAcceptor(inetSocketAddress, new QpidAcceptor(transport,"TCP")); CurrentActor.get().message(BrokerMessages.LISTENING("TCP", port)); } @@ -220,22 +242,25 @@ public class Broker { final String keystorePath = serverConfig.getConnectorKeyStorePath(); final String keystorePassword = serverConfig.getConnectorKeyStorePassword(); - final String certType = serverConfig.getConnectorCertType(); - final SSLContext sslContext = SSLContextFactory.buildServerContext(keystorePath, keystorePassword, certType); + final String keyManagerFactoryAlgorithm = serverConfig.getConnectorKeyManagerFactoryAlgorithm(); + final SSLContext sslContext = SSLContextFactory.buildServerContext(keystorePath, keystorePassword, keyManagerFactoryAlgorithm); for(int sslPort : sslPorts) { + final InetSocketAddress inetSocketAddress = new InetSocketAddress(bindAddress, sslPort); + final Set<AmqpProtocolVersion> supported = - getSupportedVersions(sslPort, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8); + getSupportedVersions(sslPort, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8, serverConfig); final NetworkTransportConfiguration settings = - new ServerNetworkTransportConfiguration(serverConfig, sslPort, bindAddress.getHostName(), Transport.TCP); + new ServerNetworkTransportConfiguration(serverConfig, inetSocketAddress, Transport.TCP); final IncomingNetworkTransport transport = Transport.getIncomingTransportInstance(); final MultiVersionProtocolEngineFactory protocolEngineFactory = - new MultiVersionProtocolEngineFactory(hostName, supported); + new MultiVersionProtocolEngineFactory(supported, defaultSupportedProtocolReply); transport.accept(settings, protocolEngineFactory, sslContext); - ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, sslPort), + + ApplicationRegistry.getInstance().addAcceptor(inetSocketAddress, new QpidAcceptor(transport,"TCP")); CurrentActor.get().message(BrokerMessages.LISTENING("TCP/SSL", sslPort)); } @@ -252,23 +277,24 @@ public class Broker private static Set<AmqpProtocolVersion> getSupportedVersions(final int port, final Set<Integer> exclude_0_10, final Set<Integer> exclude_0_9_1, final Set<Integer> exclude_0_9, - final Set<Integer> exclude_0_8) + final Set<Integer> exclude_0_8, + final ServerConfiguration serverConfig) { final EnumSet<AmqpProtocolVersion> supported = EnumSet.allOf(AmqpProtocolVersion.class); - if(exclude_0_10.contains(port)) + if(exclude_0_10.contains(port) || !serverConfig.isAmqp010enabled()) { supported.remove(AmqpProtocolVersion.v0_10); } - if(exclude_0_9_1.contains(port)) + if(exclude_0_9_1.contains(port) || !serverConfig.isAmqp091enabled()) { supported.remove(AmqpProtocolVersion.v0_9_1); } - if(exclude_0_9.contains(port)) + if(exclude_0_9.contains(port) || !serverConfig.isAmqp09enabled()) { supported.remove(AmqpProtocolVersion.v0_9); } - if(exclude_0_8.contains(port)) + if(exclude_0_8.contains(port) || !serverConfig.isAmqp08enabled()) { supported.remove(AmqpProtocolVersion.v0_8); } @@ -354,34 +380,6 @@ public class Broker } } - private byte[] parseIP(String address) throws Exception - { - char[] literalBuffer = address.toCharArray(); - int byteCount = 0; - int currByte = 0; - byte[] ip = new byte[IPV4_ADDRESS_LENGTH]; - for (int i = 0; i < literalBuffer.length; i++) - { - char currChar = literalBuffer[i]; - if ((currChar >= '0') && (currChar <= '9')) - { - currByte = (currByte * 10) + (Character.digit(currChar, 10) & 0xFF); - } - - if (currChar == IPV4_LITERAL_SEPARATOR || (i + 1 == literalBuffer.length)) - { - ip[byteCount++] = (byte) currByte; - currByte = 0; - } - } - - if (byteCount != 4) - { - throw new Exception("Invalid IP address: " + address); - } - return ip; - } - private void configureLogging(File logConfigFile, long logWatchTime) throws InitException, IOException { if (logConfigFile.exists() && logConfigFile.canRead()) @@ -447,7 +445,59 @@ public class Broker blm.register(); } - private java.util.logging.Logger updateLogger(final String logType, String logFileName) throws IOException + private void addShutdownHook() + { + Thread shutdownHookThread = new Thread(new ShutdownService()); + shutdownHookThread.setName("QpidBrokerShutdownHook"); + + Runtime.getRuntime().addShutdownHook(shutdownHookThread); + _shutdownHookThread = shutdownHookThread; + + LOGGER.debug("Added shutdown hook"); + } + + private void removeShutdownHook() + { + Thread shutdownThread = _shutdownHookThread; + + //if there is a shutdown thread and we aren't it, we should remove it + if(shutdownThread != null && !(Thread.currentThread() == shutdownThread)) + { + LOGGER.debug("Removing shutdown hook"); + + _shutdownHookThread = null; + + boolean removed = false; + try + { + removed = Runtime.getRuntime().removeShutdownHook(shutdownThread); + } + catch(IllegalStateException ise) + { + //ignore, means the JVM is already shutting down + } + + if(LOGGER.isDebugEnabled()) + { + LOGGER.debug("Removed shutdown hook: " + removed); + } + } + else + { + LOGGER.debug("Skipping shutdown hook removal as there either isnt one, or we are it."); + } + } + + private class ShutdownService implements Runnable + { + public void run() + { + LOGGER.debug("Shutdown hook running"); + Broker.this.shutdown(); + } + } + + private java.util.logging.Logger updateLogger(final String logType, String logFileName) throws IOException { java.util.logging.Logger logger = java.util.logging.Logger.getLogger(logType); logger.setLevel(Level.FINE); @@ -479,4 +529,5 @@ public class Broker logger.addHandler(handler); return logger; } + } |