diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /java/broker/src/main/java/org/apache/qpid/server/Broker.java | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/Broker.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/Broker.java | 376 |
1 files changed, 85 insertions, 291 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Broker.java b/java/broker/src/main/java/org/apache/qpid/server/Broker.java index d58a0d5bb4..54dcf0543d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/Broker.java +++ b/java/broker/src/main/java/org/apache/qpid/server/Broker.java @@ -23,37 +23,31 @@ package org.apache.qpid.server; import java.io.File; import java.io.IOException; import java.io.InputStream; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.*; -import javax.net.ssl.SSLContext; +import java.util.List; +import java.util.Properties; +import java.util.Set; + import org.apache.log4j.Logger; import org.apache.log4j.PropertyConfigurator; -import org.apache.qpid.server.configuration.ServerConfiguration; -import org.apache.qpid.server.configuration.ServerNetworkTransportConfiguration; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.configuration.BrokerProperties; +import org.apache.qpid.server.configuration.ConfigurationEntryStore; +import org.apache.qpid.server.configuration.BrokerConfigurationStoreCreator; +import org.apache.qpid.server.configuration.store.ManagementModeStoreHandler; import org.apache.qpid.server.logging.SystemOutMessageLogger; import org.apache.qpid.server.logging.actors.BrokerActor; import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.GenericActor; -import org.apache.qpid.server.logging.log4j.LoggingFacade; +import org.apache.qpid.server.logging.log4j.LoggingManagementFacade; import org.apache.qpid.server.logging.messages.BrokerMessages; -import org.apache.qpid.server.protocol.AmqpProtocolVersion; -import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; -import org.apache.qpid.server.transport.QpidAcceptor; -import org.apache.qpid.ssl.SSLContextFactory; -import org.apache.qpid.transport.NetworkTransportConfiguration; -import org.apache.qpid.transport.network.IncomingNetworkTransport; -import org.apache.qpid.transport.network.Transport; - -import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS; +import org.apache.qpid.server.registry.IApplicationRegistry; public class Broker { private static final Logger LOGGER = Logger.getLogger(Broker.class); private volatile Thread _shutdownHookThread; + private volatile IApplicationRegistry _applicationRegistry; protected static class InitException extends RuntimeException { @@ -73,7 +67,17 @@ public class Broker } finally { - ApplicationRegistry.remove(); + try + { + if (_applicationRegistry != null) + { + _applicationRegistry.close(); + } + } + finally + { + clearAMQShortStringCache(); + } } } @@ -84,274 +88,76 @@ public class Broker public void startup(final BrokerOptions options) throws Exception { + CurrentActor.set(new BrokerActor(new SystemOutMessageLogger())); try { - CurrentActor.set(new BrokerActor(new SystemOutMessageLogger())); startupImpl(options); addShutdownHook(); } finally { - CurrentActor.remove(); + try + { + CurrentActor.remove(); + } + finally + { + clearAMQShortStringCache(); + } } } private void startupImpl(final BrokerOptions options) throws Exception { - final String qpidHome = options.getQpidHome(); - final File configFile = getConfigFile(options.getConfigFile(), - BrokerOptions.DEFAULT_CONFIG_FILE, qpidHome, true); - - CurrentActor.get().message(BrokerMessages.CONFIG(configFile.getAbsolutePath())); - - File logConfigFile = getConfigFile(options.getLogConfigFile(), - BrokerOptions.DEFAULT_LOG_CONFIG_FILE, qpidHome, false); - - configureLogging(logConfigFile, options.getLogWatchFrequency()); + final String qpidHome = System.getProperty(BrokerProperties.PROPERTY_QPID_HOME); + String storeLocation = options.getConfigurationStoreLocation(); + String storeType = options.getConfigurationStoreType(); - ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile, options.getBundleContext()); - ServerConfiguration serverConfig = config.getConfiguration(); - if (options.getQpidWork() != null) + if (storeLocation == null) { - serverConfig.setQpidWork(options.getQpidWork()); - } - if (options.getQpidHome() != null) - { - serverConfig.setQpidHome(options.getQpidHome()); - } - updateManagementPorts(serverConfig, options.getJmxPortRegistryServer(), options.getJmxPortConnectorServer()); - - ApplicationRegistry.initialise(config); - - // We have already loaded the BrokerMessages class by this point so we - // need to refresh the locale setting incase we had a different value in - // the configuration. - BrokerMessages.reload(); - - // AR.initialise() sets and removes its own actor so we now need to set the actor - // for the remainder of the startup, and the default actor if the stack is empty - CurrentActor.set(new BrokerActor(config.getCompositeStartupMessageLogger())); - CurrentActor.setDefault(new BrokerActor(config.getRootMessageLogger())); - GenericActor.setDefaultMessageLogger(config.getRootMessageLogger()); - - try - { - Set<Integer> ports = new HashSet<Integer>(options.getPorts()); - if(ports.isEmpty()) - { - parsePortList(ports, serverConfig.getPorts()); - } - - Set<Integer> sslPorts = new HashSet<Integer>(options.getSSLPorts()); - if(sslPorts.isEmpty()) - { - parsePortList(sslPorts, serverConfig.getSSLPorts()); - } - - //1-0 excludes and includes - Set<Integer> exclude_1_0 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v1_0)); - if(exclude_1_0.isEmpty()) - { - parsePortList(exclude_1_0, serverConfig.getPortExclude10()); - } - - Set<Integer> include_1_0 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v1_0)); - if(include_1_0.isEmpty()) - { - parsePortList(include_1_0, serverConfig.getPortInclude10()); - } - - //0-10 excludes and includes - Set<Integer> exclude_0_10 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_10)); - if(exclude_0_10.isEmpty()) - { - parsePortList(exclude_0_10, serverConfig.getPortExclude010()); - } - - Set<Integer> include_0_10 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v0_10)); - if(include_0_10.isEmpty()) - { - parsePortList(include_0_10, serverConfig.getPortInclude010()); - } - - //0-9-1 excludes and includes - Set<Integer> exclude_0_9_1 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_9_1)); - if(exclude_0_9_1.isEmpty()) - { - parsePortList(exclude_0_9_1, serverConfig.getPortExclude091()); - } - - Set<Integer> include_0_9_1 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v0_9_1)); - if(include_0_9_1.isEmpty()) - { - parsePortList(include_0_9_1, serverConfig.getPortInclude091()); - } - - //0-9 excludes and includes - Set<Integer> exclude_0_9 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_9)); - if(exclude_0_9.isEmpty()) - { - parsePortList(exclude_0_9, serverConfig.getPortExclude09()); - } - - Set<Integer> include_0_9 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v0_9)); - if(include_0_9.isEmpty()) - { - parsePortList(include_0_9, serverConfig.getPortInclude09()); - } - - //0-8 excludes and includes - Set<Integer> exclude_0_8 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_8)); - if(exclude_0_8.isEmpty()) - { - parsePortList(exclude_0_8, serverConfig.getPortExclude08()); - } - - Set<Integer> include_0_8 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v0_8)); - if(include_0_8.isEmpty()) - { - parsePortList(include_0_8, serverConfig.getPortInclude08()); - } - - String bindAddr = options.getBind(); - if (bindAddr == null) - { - bindAddr = serverConfig.getBind(); - } - - InetAddress bindAddress; - if (bindAddr.equals(WILDCARD_ADDRESS)) - { - bindAddress = null; - } - else - { - bindAddress = InetAddress.getByName(bindAddr); - } - - final AmqpProtocolVersion defaultSupportedProtocolReply = serverConfig.getDefaultSupportedProtocolReply(); - - if (!serverConfig.getSSLOnly()) - { - for(int port : ports) - { - final InetSocketAddress inetSocketAddress = new InetSocketAddress(bindAddress, port); - - final Set<AmqpProtocolVersion> supported = - getSupportedVersions(port, exclude_1_0, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8, - include_1_0, include_0_10, include_0_9_1, include_0_9, include_0_8,serverConfig); - - final NetworkTransportConfiguration settings = - new ServerNetworkTransportConfiguration(serverConfig, inetSocketAddress, Transport.TCP); - - final IncomingNetworkTransport transport = Transport.getIncomingTransportInstance(); - final MultiVersionProtocolEngineFactory protocolEngineFactory = - new MultiVersionProtocolEngineFactory(supported, defaultSupportedProtocolReply); - - transport.accept(settings, protocolEngineFactory, null); - - ApplicationRegistry.getInstance().addAcceptor(inetSocketAddress, - new QpidAcceptor(transport,QpidAcceptor.Transport.TCP, supported)); - CurrentActor.get().message(BrokerMessages.LISTENING("TCP", port)); - } - } - - if (serverConfig.getEnableSSL()) + String qpidWork = System.getProperty(BrokerProperties.PROPERTY_QPID_WORK); + if (qpidWork == null) { - final String keystorePath = serverConfig.getConnectorKeyStorePath(); - final String keystorePassword = serverConfig.getConnectorKeyStorePassword(); - final String keystoreType = serverConfig.getConnectorKeyStoreType(); - final String keyManagerFactoryAlgorithm = serverConfig.getConnectorKeyManagerFactoryAlgorithm(); - final SSLContext sslContext; - if(serverConfig.getConnectorTrustStorePath()!=null) - { - sslContext = SSLContextFactory.buildClientContext(serverConfig.getConnectorTrustStorePath(), - serverConfig.getConnectorTrustStorePassword(), - serverConfig.getConnectorTrustStoreType(), - serverConfig.getConnectorTrustManagerFactoryAlgorithm(), - keystorePath, - keystorePassword, keystoreType, keyManagerFactoryAlgorithm, - serverConfig.getCertAlias()); - } - else - { - sslContext = SSLContextFactory.buildServerContext(keystorePath, keystorePassword, keystoreType, keyManagerFactoryAlgorithm); - } - - for(int sslPort : sslPorts) - { - final InetSocketAddress inetSocketAddress = new InetSocketAddress(bindAddress, sslPort); - - final Set<AmqpProtocolVersion> supported = - getSupportedVersions(sslPort, exclude_1_0, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8, - include_1_0, include_0_10, include_0_9_1, include_0_9, include_0_8, serverConfig); - final NetworkTransportConfiguration settings = - new ServerNetworkTransportConfiguration(serverConfig, inetSocketAddress, Transport.TCP); - - final IncomingNetworkTransport transport = Transport.getIncomingTransportInstance(); - final MultiVersionProtocolEngineFactory protocolEngineFactory = - new MultiVersionProtocolEngineFactory(supported, defaultSupportedProtocolReply); - - transport.accept(settings, protocolEngineFactory, sslContext); - - ApplicationRegistry.getInstance().addAcceptor(inetSocketAddress, - new QpidAcceptor(transport,QpidAcceptor.Transport.SSL, supported)); - CurrentActor.get().message(BrokerMessages.LISTENING("TCP/SSL", sslPort)); - } + qpidWork = new File(System.getProperty("user.dir"), "work").getAbsolutePath(); } - - CurrentActor.get().message(BrokerMessages.READY()); - } - finally - { - // Startup is complete so remove the AR initialised Startup actor - CurrentActor.remove(); + storeLocation = new File(qpidWork, BrokerOptions.DEFAULT_CONFIG_FILE + "." + storeType).getAbsolutePath(); } - } - private static Set<AmqpProtocolVersion> getSupportedVersions(final int port, - final Set<Integer> exclude_1_0, - final Set<Integer> exclude_0_10, - final Set<Integer> exclude_0_9_1, - final Set<Integer> exclude_0_9, - final Set<Integer> exclude_0_8, - final Set<Integer> include_1_0, - final Set<Integer> include_0_10, - final Set<Integer> include_0_9_1, - final Set<Integer> include_0_9, - final Set<Integer> include_0_8, - final ServerConfiguration serverConfig) - { - final EnumSet<AmqpProtocolVersion> supported = EnumSet.allOf(AmqpProtocolVersion.class); + CurrentActor.get().message(BrokerMessages.CONFIG(storeLocation)); - if((exclude_1_0.contains(port) || !serverConfig.isAmqp10enabled()) && !include_1_0.contains(port)) - { - supported.remove(AmqpProtocolVersion.v1_0_0); - } + File logConfigFile = getConfigFile(options.getLogConfigFile(), BrokerOptions.DEFAULT_LOG_CONFIG_FILE, qpidHome, false); + configureLogging(logConfigFile, options.getLogWatchFrequency()); - if((exclude_0_10.contains(port) || !serverConfig.isAmqp010enabled()) && !include_0_10.contains(port)) - { - supported.remove(AmqpProtocolVersion.v0_10); - } + BrokerConfigurationStoreCreator storeCreator = new BrokerConfigurationStoreCreator(); + ConfigurationEntryStore store = storeCreator.createStore(storeLocation, storeType, + options.getInitialConfigurationStoreLocation(), options.getInitialConfigurationStoreLocation()); - if((exclude_0_9_1.contains(port) || !serverConfig.isAmqp091enabled()) && !include_0_9_1.contains(port)) + if (options.isManagementMode()) { - supported.remove(AmqpProtocolVersion.v0_9_1); + store = new ManagementModeStoreHandler(store, options); } - if((exclude_0_9.contains(port) || !serverConfig.isAmqp09enabled()) && !include_0_9.contains(port)) + _applicationRegistry = new ApplicationRegistry(store); + try { - supported.remove(AmqpProtocolVersion.v0_9); + _applicationRegistry.initialise(); } - - if((exclude_0_8.contains(port) || !serverConfig.isAmqp08enabled()) && !include_0_8.contains(port)) + catch(Exception e) { - supported.remove(AmqpProtocolVersion.v0_8); + try + { + _applicationRegistry.close(); + } + catch(Exception ce) + { + LOGGER.debug("An error occured when closing the registry following initialization failure", ce); + } + throw e; } - return supported; } + private File getConfigFile(final String fileName, final String defaultFileName, final String qpidHome, boolean throwOnFileNotFound) throws InitException @@ -368,11 +174,11 @@ public class Broker if (!configFile.exists() && throwOnFileNotFound) { - String error = "File " + fileName + " could not be found. Check the file exists and is readable."; + String error = "File " + configFile + " could not be found. Check the file exists and is readable."; if (qpidHome == null) { - error = error + "\nNote: " + BrokerOptions.QPID_HOME + " is not set."; + error = error + "\nNote: " + BrokerProperties.PROPERTY_QPID_HOME + " is not set."; } throw new InitException(error, null); @@ -399,37 +205,6 @@ public class Broker } } - /** - * Update the configuration data with the management port. - * @param configuration - * @param registryServerPort The string from the command line - */ - private void updateManagementPorts(ServerConfiguration configuration, Integer registryServerPort, Integer connectorServerPort) - { - if (registryServerPort != null) - { - try - { - configuration.setJMXPortRegistryServer(registryServerPort); - } - catch (NumberFormatException e) - { - throw new InitException("Invalid management (registry server) port: " + registryServerPort, null); - } - } - if (connectorServerPort != null) - { - try - { - configuration.setJMXPortConnectorServer(connectorServerPort); - } - catch (NumberFormatException e) - { - throw new InitException("Invalid management (connector server) port: " + connectorServerPort, null); - } - } - } - private void configureLogging(File logConfigFile, int logWatchTime) throws InitException, IOException { if (logConfigFile.exists() && logConfigFile.canRead()) @@ -443,7 +218,7 @@ public class Broker // log4j expects the watch interval in milliseconds try { - LoggingFacade.configureAndWatch(logConfigFile.getPath(), logWatchTime * 1000); + LoggingManagementFacade.configureAndWatch(logConfigFile.getPath(), logWatchTime * 1000); } catch (Exception e) { @@ -454,7 +229,7 @@ public class Broker { try { - LoggingFacade.configure(logConfigFile.getPath()); + LoggingManagementFacade.configure(logConfigFile.getPath()); } catch (Exception e) { @@ -531,6 +306,24 @@ public class Broker LOGGER.debug("Skipping shutdown hook removal as there either isnt one, or we are it."); } } + /** + * Workaround that prevents AMQShortStrings cache from being left in the thread local. This is important + * when embedding the Broker in containers where the starting thread may not belong to Qpid. + * The long term solution here is to stop our use of AMQShortString outside the AMQP transport layer. + */ + private void clearAMQShortStringCache() + { + AMQShortString.clearLocalCache(); + } + + public org.apache.qpid.server.model.Broker getBroker() + { + if (_applicationRegistry == null) + { + return null; + } + return _applicationRegistry.getBroker(); + } private class ShutdownService implements Runnable { @@ -540,4 +333,5 @@ public class Broker Broker.this.shutdown(); } } + } |