summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/Broker.java
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2013-02-28 16:14:30 +0000
committerKim van der Riet <kpvdr@apache.org>2013-02-28 16:14:30 +0000
commit9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch)
tree2a890e1df09e5b896a9b4168a7b22648f559a1f2 /java/broker/src/main/java/org/apache/qpid/server/Broker.java
parent172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff)
downloadqpid-python-9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919.tar.gz
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/Broker.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/Broker.java376
1 files changed, 85 insertions, 291 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Broker.java b/java/broker/src/main/java/org/apache/qpid/server/Broker.java
index d58a0d5bb4..54dcf0543d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/Broker.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/Broker.java
@@ -23,37 +23,31 @@ package org.apache.qpid.server;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.*;
-import javax.net.ssl.SSLContext;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
-import org.apache.qpid.server.configuration.ServerConfiguration;
-import org.apache.qpid.server.configuration.ServerNetworkTransportConfiguration;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.configuration.BrokerProperties;
+import org.apache.qpid.server.configuration.ConfigurationEntryStore;
+import org.apache.qpid.server.configuration.BrokerConfigurationStoreCreator;
+import org.apache.qpid.server.configuration.store.ManagementModeStoreHandler;
import org.apache.qpid.server.logging.SystemOutMessageLogger;
import org.apache.qpid.server.logging.actors.BrokerActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.GenericActor;
-import org.apache.qpid.server.logging.log4j.LoggingFacade;
+import org.apache.qpid.server.logging.log4j.LoggingManagementFacade;
import org.apache.qpid.server.logging.messages.BrokerMessages;
-import org.apache.qpid.server.protocol.AmqpProtocolVersion;
-import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
-import org.apache.qpid.server.transport.QpidAcceptor;
-import org.apache.qpid.ssl.SSLContextFactory;
-import org.apache.qpid.transport.NetworkTransportConfiguration;
-import org.apache.qpid.transport.network.IncomingNetworkTransport;
-import org.apache.qpid.transport.network.Transport;
-
-import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
+import org.apache.qpid.server.registry.IApplicationRegistry;
public class Broker
{
private static final Logger LOGGER = Logger.getLogger(Broker.class);
private volatile Thread _shutdownHookThread;
+ private volatile IApplicationRegistry _applicationRegistry;
protected static class InitException extends RuntimeException
{
@@ -73,7 +67,17 @@ public class Broker
}
finally
{
- ApplicationRegistry.remove();
+ try
+ {
+ if (_applicationRegistry != null)
+ {
+ _applicationRegistry.close();
+ }
+ }
+ finally
+ {
+ clearAMQShortStringCache();
+ }
}
}
@@ -84,274 +88,76 @@ public class Broker
public void startup(final BrokerOptions options) throws Exception
{
+ CurrentActor.set(new BrokerActor(new SystemOutMessageLogger()));
try
{
- CurrentActor.set(new BrokerActor(new SystemOutMessageLogger()));
startupImpl(options);
addShutdownHook();
}
finally
{
- CurrentActor.remove();
+ try
+ {
+ CurrentActor.remove();
+ }
+ finally
+ {
+ clearAMQShortStringCache();
+ }
}
}
private void startupImpl(final BrokerOptions options) throws Exception
{
- final String qpidHome = options.getQpidHome();
- final File configFile = getConfigFile(options.getConfigFile(),
- BrokerOptions.DEFAULT_CONFIG_FILE, qpidHome, true);
-
- CurrentActor.get().message(BrokerMessages.CONFIG(configFile.getAbsolutePath()));
-
- File logConfigFile = getConfigFile(options.getLogConfigFile(),
- BrokerOptions.DEFAULT_LOG_CONFIG_FILE, qpidHome, false);
-
- configureLogging(logConfigFile, options.getLogWatchFrequency());
+ final String qpidHome = System.getProperty(BrokerProperties.PROPERTY_QPID_HOME);
+ String storeLocation = options.getConfigurationStoreLocation();
+ String storeType = options.getConfigurationStoreType();
- ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile, options.getBundleContext());
- ServerConfiguration serverConfig = config.getConfiguration();
- if (options.getQpidWork() != null)
+ if (storeLocation == null)
{
- serverConfig.setQpidWork(options.getQpidWork());
- }
- if (options.getQpidHome() != null)
- {
- serverConfig.setQpidHome(options.getQpidHome());
- }
- updateManagementPorts(serverConfig, options.getJmxPortRegistryServer(), options.getJmxPortConnectorServer());
-
- ApplicationRegistry.initialise(config);
-
- // We have already loaded the BrokerMessages class by this point so we
- // need to refresh the locale setting incase we had a different value in
- // the configuration.
- BrokerMessages.reload();
-
- // AR.initialise() sets and removes its own actor so we now need to set the actor
- // for the remainder of the startup, and the default actor if the stack is empty
- CurrentActor.set(new BrokerActor(config.getCompositeStartupMessageLogger()));
- CurrentActor.setDefault(new BrokerActor(config.getRootMessageLogger()));
- GenericActor.setDefaultMessageLogger(config.getRootMessageLogger());
-
- try
- {
- Set<Integer> ports = new HashSet<Integer>(options.getPorts());
- if(ports.isEmpty())
- {
- parsePortList(ports, serverConfig.getPorts());
- }
-
- Set<Integer> sslPorts = new HashSet<Integer>(options.getSSLPorts());
- if(sslPorts.isEmpty())
- {
- parsePortList(sslPorts, serverConfig.getSSLPorts());
- }
-
- //1-0 excludes and includes
- Set<Integer> exclude_1_0 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v1_0));
- if(exclude_1_0.isEmpty())
- {
- parsePortList(exclude_1_0, serverConfig.getPortExclude10());
- }
-
- Set<Integer> include_1_0 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v1_0));
- if(include_1_0.isEmpty())
- {
- parsePortList(include_1_0, serverConfig.getPortInclude10());
- }
-
- //0-10 excludes and includes
- Set<Integer> exclude_0_10 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_10));
- if(exclude_0_10.isEmpty())
- {
- parsePortList(exclude_0_10, serverConfig.getPortExclude010());
- }
-
- Set<Integer> include_0_10 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v0_10));
- if(include_0_10.isEmpty())
- {
- parsePortList(include_0_10, serverConfig.getPortInclude010());
- }
-
- //0-9-1 excludes and includes
- Set<Integer> exclude_0_9_1 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_9_1));
- if(exclude_0_9_1.isEmpty())
- {
- parsePortList(exclude_0_9_1, serverConfig.getPortExclude091());
- }
-
- Set<Integer> include_0_9_1 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v0_9_1));
- if(include_0_9_1.isEmpty())
- {
- parsePortList(include_0_9_1, serverConfig.getPortInclude091());
- }
-
- //0-9 excludes and includes
- Set<Integer> exclude_0_9 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_9));
- if(exclude_0_9.isEmpty())
- {
- parsePortList(exclude_0_9, serverConfig.getPortExclude09());
- }
-
- Set<Integer> include_0_9 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v0_9));
- if(include_0_9.isEmpty())
- {
- parsePortList(include_0_9, serverConfig.getPortInclude09());
- }
-
- //0-8 excludes and includes
- Set<Integer> exclude_0_8 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_8));
- if(exclude_0_8.isEmpty())
- {
- parsePortList(exclude_0_8, serverConfig.getPortExclude08());
- }
-
- Set<Integer> include_0_8 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v0_8));
- if(include_0_8.isEmpty())
- {
- parsePortList(include_0_8, serverConfig.getPortInclude08());
- }
-
- String bindAddr = options.getBind();
- if (bindAddr == null)
- {
- bindAddr = serverConfig.getBind();
- }
-
- InetAddress bindAddress;
- if (bindAddr.equals(WILDCARD_ADDRESS))
- {
- bindAddress = null;
- }
- else
- {
- bindAddress = InetAddress.getByName(bindAddr);
- }
-
- final AmqpProtocolVersion defaultSupportedProtocolReply = serverConfig.getDefaultSupportedProtocolReply();
-
- if (!serverConfig.getSSLOnly())
- {
- for(int port : ports)
- {
- final InetSocketAddress inetSocketAddress = new InetSocketAddress(bindAddress, port);
-
- final Set<AmqpProtocolVersion> supported =
- getSupportedVersions(port, exclude_1_0, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8,
- include_1_0, include_0_10, include_0_9_1, include_0_9, include_0_8,serverConfig);
-
- final NetworkTransportConfiguration settings =
- new ServerNetworkTransportConfiguration(serverConfig, inetSocketAddress, Transport.TCP);
-
- final IncomingNetworkTransport transport = Transport.getIncomingTransportInstance();
- final MultiVersionProtocolEngineFactory protocolEngineFactory =
- new MultiVersionProtocolEngineFactory(supported, defaultSupportedProtocolReply);
-
- transport.accept(settings, protocolEngineFactory, null);
-
- ApplicationRegistry.getInstance().addAcceptor(inetSocketAddress,
- new QpidAcceptor(transport,QpidAcceptor.Transport.TCP, supported));
- CurrentActor.get().message(BrokerMessages.LISTENING("TCP", port));
- }
- }
-
- if (serverConfig.getEnableSSL())
+ String qpidWork = System.getProperty(BrokerProperties.PROPERTY_QPID_WORK);
+ if (qpidWork == null)
{
- final String keystorePath = serverConfig.getConnectorKeyStorePath();
- final String keystorePassword = serverConfig.getConnectorKeyStorePassword();
- final String keystoreType = serverConfig.getConnectorKeyStoreType();
- final String keyManagerFactoryAlgorithm = serverConfig.getConnectorKeyManagerFactoryAlgorithm();
- final SSLContext sslContext;
- if(serverConfig.getConnectorTrustStorePath()!=null)
- {
- sslContext = SSLContextFactory.buildClientContext(serverConfig.getConnectorTrustStorePath(),
- serverConfig.getConnectorTrustStorePassword(),
- serverConfig.getConnectorTrustStoreType(),
- serverConfig.getConnectorTrustManagerFactoryAlgorithm(),
- keystorePath,
- keystorePassword, keystoreType, keyManagerFactoryAlgorithm,
- serverConfig.getCertAlias());
- }
- else
- {
- sslContext = SSLContextFactory.buildServerContext(keystorePath, keystorePassword, keystoreType, keyManagerFactoryAlgorithm);
- }
-
- for(int sslPort : sslPorts)
- {
- final InetSocketAddress inetSocketAddress = new InetSocketAddress(bindAddress, sslPort);
-
- final Set<AmqpProtocolVersion> supported =
- getSupportedVersions(sslPort, exclude_1_0, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8,
- include_1_0, include_0_10, include_0_9_1, include_0_9, include_0_8, serverConfig);
- final NetworkTransportConfiguration settings =
- new ServerNetworkTransportConfiguration(serverConfig, inetSocketAddress, Transport.TCP);
-
- final IncomingNetworkTransport transport = Transport.getIncomingTransportInstance();
- final MultiVersionProtocolEngineFactory protocolEngineFactory =
- new MultiVersionProtocolEngineFactory(supported, defaultSupportedProtocolReply);
-
- transport.accept(settings, protocolEngineFactory, sslContext);
-
- ApplicationRegistry.getInstance().addAcceptor(inetSocketAddress,
- new QpidAcceptor(transport,QpidAcceptor.Transport.SSL, supported));
- CurrentActor.get().message(BrokerMessages.LISTENING("TCP/SSL", sslPort));
- }
+ qpidWork = new File(System.getProperty("user.dir"), "work").getAbsolutePath();
}
-
- CurrentActor.get().message(BrokerMessages.READY());
- }
- finally
- {
- // Startup is complete so remove the AR initialised Startup actor
- CurrentActor.remove();
+ storeLocation = new File(qpidWork, BrokerOptions.DEFAULT_CONFIG_FILE + "." + storeType).getAbsolutePath();
}
- }
- private static Set<AmqpProtocolVersion> getSupportedVersions(final int port,
- final Set<Integer> exclude_1_0,
- final Set<Integer> exclude_0_10,
- final Set<Integer> exclude_0_9_1,
- final Set<Integer> exclude_0_9,
- final Set<Integer> exclude_0_8,
- final Set<Integer> include_1_0,
- final Set<Integer> include_0_10,
- final Set<Integer> include_0_9_1,
- final Set<Integer> include_0_9,
- final Set<Integer> include_0_8,
- final ServerConfiguration serverConfig)
- {
- final EnumSet<AmqpProtocolVersion> supported = EnumSet.allOf(AmqpProtocolVersion.class);
+ CurrentActor.get().message(BrokerMessages.CONFIG(storeLocation));
- if((exclude_1_0.contains(port) || !serverConfig.isAmqp10enabled()) && !include_1_0.contains(port))
- {
- supported.remove(AmqpProtocolVersion.v1_0_0);
- }
+ File logConfigFile = getConfigFile(options.getLogConfigFile(), BrokerOptions.DEFAULT_LOG_CONFIG_FILE, qpidHome, false);
+ configureLogging(logConfigFile, options.getLogWatchFrequency());
- if((exclude_0_10.contains(port) || !serverConfig.isAmqp010enabled()) && !include_0_10.contains(port))
- {
- supported.remove(AmqpProtocolVersion.v0_10);
- }
+ BrokerConfigurationStoreCreator storeCreator = new BrokerConfigurationStoreCreator();
+ ConfigurationEntryStore store = storeCreator.createStore(storeLocation, storeType,
+ options.getInitialConfigurationStoreLocation(), options.getInitialConfigurationStoreLocation());
- if((exclude_0_9_1.contains(port) || !serverConfig.isAmqp091enabled()) && !include_0_9_1.contains(port))
+ if (options.isManagementMode())
{
- supported.remove(AmqpProtocolVersion.v0_9_1);
+ store = new ManagementModeStoreHandler(store, options);
}
- if((exclude_0_9.contains(port) || !serverConfig.isAmqp09enabled()) && !include_0_9.contains(port))
+ _applicationRegistry = new ApplicationRegistry(store);
+ try
{
- supported.remove(AmqpProtocolVersion.v0_9);
+ _applicationRegistry.initialise();
}
-
- if((exclude_0_8.contains(port) || !serverConfig.isAmqp08enabled()) && !include_0_8.contains(port))
+ catch(Exception e)
{
- supported.remove(AmqpProtocolVersion.v0_8);
+ try
+ {
+ _applicationRegistry.close();
+ }
+ catch(Exception ce)
+ {
+ LOGGER.debug("An error occured when closing the registry following initialization failure", ce);
+ }
+ throw e;
}
- return supported;
}
+
private File getConfigFile(final String fileName,
final String defaultFileName,
final String qpidHome, boolean throwOnFileNotFound) throws InitException
@@ -368,11 +174,11 @@ public class Broker
if (!configFile.exists() && throwOnFileNotFound)
{
- String error = "File " + fileName + " could not be found. Check the file exists and is readable.";
+ String error = "File " + configFile + " could not be found. Check the file exists and is readable.";
if (qpidHome == null)
{
- error = error + "\nNote: " + BrokerOptions.QPID_HOME + " is not set.";
+ error = error + "\nNote: " + BrokerProperties.PROPERTY_QPID_HOME + " is not set.";
}
throw new InitException(error, null);
@@ -399,37 +205,6 @@ public class Broker
}
}
- /**
- * Update the configuration data with the management port.
- * @param configuration
- * @param registryServerPort The string from the command line
- */
- private void updateManagementPorts(ServerConfiguration configuration, Integer registryServerPort, Integer connectorServerPort)
- {
- if (registryServerPort != null)
- {
- try
- {
- configuration.setJMXPortRegistryServer(registryServerPort);
- }
- catch (NumberFormatException e)
- {
- throw new InitException("Invalid management (registry server) port: " + registryServerPort, null);
- }
- }
- if (connectorServerPort != null)
- {
- try
- {
- configuration.setJMXPortConnectorServer(connectorServerPort);
- }
- catch (NumberFormatException e)
- {
- throw new InitException("Invalid management (connector server) port: " + connectorServerPort, null);
- }
- }
- }
-
private void configureLogging(File logConfigFile, int logWatchTime) throws InitException, IOException
{
if (logConfigFile.exists() && logConfigFile.canRead())
@@ -443,7 +218,7 @@ public class Broker
// log4j expects the watch interval in milliseconds
try
{
- LoggingFacade.configureAndWatch(logConfigFile.getPath(), logWatchTime * 1000);
+ LoggingManagementFacade.configureAndWatch(logConfigFile.getPath(), logWatchTime * 1000);
}
catch (Exception e)
{
@@ -454,7 +229,7 @@ public class Broker
{
try
{
- LoggingFacade.configure(logConfigFile.getPath());
+ LoggingManagementFacade.configure(logConfigFile.getPath());
}
catch (Exception e)
{
@@ -531,6 +306,24 @@ public class Broker
LOGGER.debug("Skipping shutdown hook removal as there either isnt one, or we are it.");
}
}
+ /**
+ * Workaround that prevents AMQShortStrings cache from being left in the thread local. This is important
+ * when embedding the Broker in containers where the starting thread may not belong to Qpid.
+ * The long term solution here is to stop our use of AMQShortString outside the AMQP transport layer.
+ */
+ private void clearAMQShortStringCache()
+ {
+ AMQShortString.clearLocalCache();
+ }
+
+ public org.apache.qpid.server.model.Broker getBroker()
+ {
+ if (_applicationRegistry == null)
+ {
+ return null;
+ }
+ return _applicationRegistry.getBroker();
+ }
private class ShutdownService implements Runnable
{
@@ -540,4 +333,5 @@ public class Broker
Broker.this.shutdown();
}
}
+
}