diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2011-08-22 20:14:35 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2011-08-22 20:14:35 +0000 |
commit | 80f7040fb49757f80b9450bf497cbb1ff4f4f154 (patch) | |
tree | bf5c72e73579d6df3116ad13fa243730ed36a7fb /qpid/java/broker/src/main/java/org | |
parent | 9d88761d6711f7f8722091fcb8849c0e89a8b8c9 (diff) | |
download | qpid-python-80f7040fb49757f80b9450bf497cbb1ff4f4f154.tar.gz |
NO-JIRA : Merge to 1-0 sandbox branch from current HEAD - r1160304
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rg-amqp-1-0-sandbox@1160414 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/main/java/org')
12 files changed, 537 insertions, 212 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java index cdc022ddac..e29b07b3e7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java @@ -34,6 +34,8 @@ import java.util.Properties; import java.util.Set; import java.util.logging.*; +import javax.net.ssl.SSLContext; + import org.apache.log4j.PropertyConfigurator; import org.apache.log4j.xml.QpidLog4JConfigurator; import org.apache.qpid.server.configuration.ServerConfiguration; @@ -46,8 +48,8 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.management.LoggingManagementMBean; import org.apache.qpid.server.logging.messages.BrokerMessages; -import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory; import org.apache.qpid.server.protocol.AmqpProtocolVersion; +import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; import org.apache.qpid.server.transport.QpidAcceptor; @@ -115,7 +117,7 @@ public class Broker configureLogging(logConfigFile, options.getLogWatchFrequency()); - ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile); + ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile, options.getBundleContext()); ServerConfiguration serverConfig = config.getConfiguration(); updateManagementPort(serverConfig, options.getJmxPort()); @@ -199,57 +201,41 @@ public class Broker { for(int port : ports) { - Set<AmqpProtocolVersion> supported = EnumSet.allOf(AmqpProtocolVersion.class); - - if(exclude_0_10.contains(port)) - { - supported.remove(AmqpProtocolVersion.v0_10); - } - - if(exclude_0_9_1.contains(port)) - { - supported.remove(AmqpProtocolVersion.v0_9_1); - } - if(exclude_0_9.contains(port)) - { - supported.remove(AmqpProtocolVersion.v0_9); - } - if(exclude_0_8.contains(port)) - { - supported.remove(AmqpProtocolVersion.v0_8); - } - - NetworkTransportConfiguration settings = - new ServerNetworkTransportConfiguration(serverConfig, port, bindAddress.getHostName(), Transport.TCP); - - IncomingNetworkTransport transport = Transport.getIncomingTransportInstance(); - MultiVersionProtocolEngineFactory protocolEngineFactory = - new MultiVersionProtocolEngineFactory(hostName, supported); + final Set<AmqpProtocolVersion> supported = + getSupportedVersions(port, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8); + final NetworkTransportConfiguration settings = + new ServerNetworkTransportConfiguration(serverConfig, port, bindAddress.getHostName(), Transport.TCP); + + final IncomingNetworkTransport transport = Transport.getIncomingTransportInstance(); + final MultiVersionProtocolEngineFactory protocolEngineFactory = + new MultiVersionProtocolEngineFactory(hostName, supported); transport.accept(settings, protocolEngineFactory, null); ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port), - new QpidAcceptor(transport,"TCP")); + new QpidAcceptor(transport,"TCP")); CurrentActor.get().message(BrokerMessages.LISTENING("TCP", port)); } } if (serverConfig.getEnableSSL()) { - String keystorePath = serverConfig.getKeystorePath(); - String keystorePassword = serverConfig.getKeystorePassword(); - String certType = serverConfig.getCertType(); - SSLContextFactory sslFactory = - new SSLContextFactory(keystorePath, keystorePassword, certType); + final String keystorePath = serverConfig.getKeystorePath(); + final String keystorePassword = serverConfig.getKeystorePassword(); + final String certType = serverConfig.getCertType(); + final SSLContext sslContext = SSLContextFactory.buildServerContext(keystorePath, keystorePassword, certType); for(int sslPort : sslPorts) { - NetworkTransportConfiguration settings = + final Set<AmqpProtocolVersion> supported = + getSupportedVersions(sslPort, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8); + final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration(serverConfig, sslPort, bindAddress.getHostName(), Transport.TCP); - IncomingNetworkTransport transport = new MinaNetworkTransport(); - - transport.accept(settings, new MultiVersionProtocolEngineFactory(), sslFactory); + final IncomingNetworkTransport transport = new MinaNetworkTransport(); + final MultiVersionProtocolEngineFactory protocolEngineFactory = + new MultiVersionProtocolEngineFactory(hostName, supported); + transport.accept(settings, protocolEngineFactory, sslContext); ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, sslPort), new QpidAcceptor(transport,"TCP")); CurrentActor.get().message(BrokerMessages.LISTENING("TCP/SSL", sslPort)); @@ -265,6 +251,32 @@ public class Broker } } + private static Set<AmqpProtocolVersion> getSupportedVersions(final int port, final Set<Integer> exclude_0_10, + final Set<Integer> exclude_0_9_1, final Set<Integer> exclude_0_9, + final Set<Integer> exclude_0_8) + { + final EnumSet<AmqpProtocolVersion> supported = EnumSet.allOf(AmqpProtocolVersion.class); + + if(exclude_0_10.contains(port)) + { + supported.remove(AmqpProtocolVersion.v0_10); + } + if(exclude_0_9_1.contains(port)) + { + supported.remove(AmqpProtocolVersion.v0_9_1); + } + if(exclude_0_9.contains(port)) + { + supported.remove(AmqpProtocolVersion.v0_9); + } + if(exclude_0_8.contains(port)) + { + supported.remove(AmqpProtocolVersion.v0_8); + } + + return supported; + } + private File getConfigFile(final String fileName, final String defaultFileName, final String qpidHome, boolean throwOnFileNotFound) throws InitException diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java index b83da92660..b2d029eb95 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java @@ -26,6 +26,8 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.osgi.framework.BundleContext; + public class BrokerOptions { /** serialVersionUID */ @@ -51,9 +53,11 @@ public class BrokerOptions private String _logConfigFile; private String _bind; private Integer _jmxPort; + private BundleContext _bundleContext; private Integer _logWatchFrequency = 0; + public void addPort(final int port) { _ports.add(port); @@ -149,4 +153,14 @@ public class BrokerOptions { _logWatchFrequency = logWatchFrequency; } + + public BundleContext getBundleContext() + { + return _bundleContext ; + } + + public void setBundleContext(final BundleContext bundleContext) + { + _bundleContext = bundleContext; + } }
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index 0621b87f0a..02f8a346cf 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -39,17 +39,13 @@ import org.apache.commons.configuration.HierarchicalConfiguration; import org.apache.commons.configuration.SystemConfiguration; import org.apache.commons.configuration.XMLConfiguration; import org.apache.log4j.Logger; -import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean; import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.signal.SignalHandlerTask; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -import org.apache.qpid.transport.NetworkTransportConfiguration; -import sun.misc.Signal; -import sun.misc.SignalHandler; - -public class ServerConfiguration extends ConfigurationPlugin implements SignalHandler +public class ServerConfiguration extends ConfigurationPlugin { protected static final Logger _logger = Logger.getLogger(ServerConfiguration.class); @@ -60,7 +56,7 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa public static final int DEFAULT_FRAME_SIZE = 65536; public static final int DEFAULT_PORT = 5672; - public static final int DEFAULT_SSL_PORT = 8672; + public static final int DEFAULT_SSL_PORT = 5671; public static final long DEFAULT_HOUSEKEEPING_PERIOD = 30000L; public static final int DEFAULT_JMXPORT = 8999; @@ -75,10 +71,6 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa private File _configFile; private File _vhostsFile; - private Logger _log = Logger.getLogger(this.getClass()); - - private ConfigurationManagementMBean _mbean; - // Map of environment variables to config items private static final Map<String, String> envVarMap = new HashMap<String, String>(); @@ -137,15 +129,26 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa { this(parseConfig(configurationURL)); _configFile = configurationURL; - try + + SignalHandlerTask hupReparseTask = new SignalHandlerTask() { - Signal sig = new sun.misc.Signal("HUP"); - sun.misc.Signal.handle(sig, this); - } - catch (Exception e) + public void handle() + { + try + { + reparseConfigFileSecuritySections(); + } + catch (ConfigurationException e) + { + _logger.error("Could not reload configuration file security sections", e); + } + } + }; + + if(!hupReparseTask.register("HUP")) { - _logger.info("Signal HUP not supported for OS: " + System.getProperty("os.name")); - // We're on something that doesn't handle SIGHUP, how sad, Windows. + _logger.info("Unable to register Signal HUP handler to reload security configuration."); + _logger.info("Signal HUP not supported for this OS / JVM combination - " + SignalHandlerTask.getPlatformDescription()); } } @@ -416,18 +419,6 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa return _configFile == null ? "" : _configFile.getAbsolutePath(); } - public void handle(Signal arg0) - { - try - { - reparseConfigFileSecuritySections(); - } - catch (ConfigurationException e) - { - _logger.error("Could not reload configuration file security sections", e); - } - } - public void reparseConfigFileSecuritySections() throws ConfigurationException { if (_configFile != null) @@ -688,12 +679,12 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa public String getKeystorePath() { - return getStringValue("connector.ssl.keystorePath", "none"); + return getStringValue("connector.ssl.keystorePath"); } public String getKeystorePassword() { - return getStringValue("connector.ssl.keystorePassword", "none"); + return getStringValue("connector.ssl.keystorePassword"); } public String getCertType() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/OsgiSystemPackageUtil.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/OsgiSystemPackageUtil.java new file mode 100644 index 0000000000..644f714c8c --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/OsgiSystemPackageUtil.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.plugins; + +import java.util.Iterator; +import java.util.Map; + +import org.osgi.framework.Version; + +/** + * Utility class to convert a map of package name to version numbers into the string + * with the format expected of a OSGi system package declaration: + * + * <code> + * org.xyz; version=1.0.0, org.xyz.xyz; version=1.0.0,... + * </code> + * + * Additionally, if the caller has provided a qpidPackageReleaseNumber and the package + * begins org.apache.qpid, this release number will be used, in preference to the one + * found in the Map. + * + * @see org.osgi.framework.Constants#FRAMEWORK_SYSTEMPACKAGES + * + */ +public class OsgiSystemPackageUtil +{ + private static final String APACHE_QPID_PKG_PREFIX = "org.apache.qpid"; + + private final Map<String, String> _packageNameVersionMap; + private final Version _qpidPackageReleaseNumber; + + public OsgiSystemPackageUtil(final Version qpidPackageReleaseNumber, final Map<String, String> packageNameVersionMap) + { + _qpidPackageReleaseNumber = qpidPackageReleaseNumber; + _packageNameVersionMap = packageNameVersionMap; + } + + public String getFormattedSystemPackageString() + { + if (_packageNameVersionMap == null || _packageNameVersionMap.size() == 0) + { + return null; + } + + final StringBuilder packages = new StringBuilder(); + + for(Iterator<String> itr = _packageNameVersionMap.keySet().iterator(); itr.hasNext();) + { + final String packageName = itr.next(); + final String packageVersion; + + if (_qpidPackageReleaseNumber != null && packageName.startsWith(APACHE_QPID_PKG_PREFIX)) + { + packageVersion = _qpidPackageReleaseNumber.toString(); + } + else + { + packageVersion = _packageNameVersionMap.get(packageName); + } + + packages.append(packageName); + packages.append("; "); + packages.append("version="); + packages.append(packageVersion); + + if (itr.hasNext()) + { + packages.append(", "); + } + } + + return packages.toString(); + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/OsgiSystemPackages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/OsgiSystemPackages.properties new file mode 100644 index 0000000000..aaab4f76cc --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/OsgiSystemPackages.properties @@ -0,0 +1,93 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one# +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# +# OSGi framework system package list +# +# PluginManager uses these properties to construct the FRAMEWORK_SYSTEMPACKAGES list +# + +# Format is: +# <package>=<version> +# and PluginManager will convert this into: +# <package>; version=<version> +# e.g. org.osgi.framework; version=1.3.0 + +javax.management.openmbean=1.0.0 +javax.management=1.0.0 + +javax.security.auth=1.0.0 +javax.security.auth.callback=1.0.0 +javax.security.sasl=1.0.0 +javax.security=1.0.0 + +org.xml.sax=1.0.0 +org.xml.sax.helpers=1.0.0 + +org.osgi.framework=1.3.0 +org.osgi.service.packageadmin=1.2.0 +org.osgi.service.startlevel=1.0.0 +org.osgi.service.url=1.0.0 +org.osgi.util.tracker=1.0.0 + +org.apache.commons.configuration=1.0.0 + +org.apache.commons.lang=1.0.0 +org.apache.commons.lang.builder=1.0.0 +org.apache.commons.logging=1.0.0 + +org.apache.log4j=1.2.12 + +org.slf4j=1.6.1 + +# For Qpid packages (org.apache.qpid), the version number is automatically overridden by QpidPropertis#getReleaseVersion() + +org.apache.qpid.junit.extensions.util=0.0.0 +org.apache.qpid=0.0.0 +org.apache.qpid.common=0.0.0 +org.apache.qpid.exchange=0.0.0 +org.apache.qpid.framing=0.0.0 +org.apache.qpid.management.common.mbeans.annotations=0.0.0 +org.apache.qpid.protocol=0.0.0 +org.apache.qpid.transport=0.0.0 +org.apache.qpid.transport.codec=0.0.0 +org.apache.qpid.server.binding=0.0.0 +org.apache.qpid.server.configuration=0.0.0 +org.apache.qpid.server.configuration.plugins=0.0.0 +org.apache.qpid.server.configuration.management=0.0.0 +org.apache.qpid.server.exchange=0.0.0 +org.apache.qpid.server.logging=0.0.0 +org.apache.qpid.server.logging.actors=0.0.0 +org.apache.qpid.server.logging.subjects=0.0.0 +org.apache.qpid.server.management=0.0.0 +org.apache.qpid.server.persistent=0.0.0 +org.apache.qpid.server.plugins=0.0.0 +org.apache.qpid.server.protocol=0.0.0 +org.apache.qpid.server.queue=0.0.0 +org.apache.qpid.server.registry=0.0.0 +org.apache.qpid.server.security=0.0.0 +org.apache.qpid.server.security.access=0.0.0 +org.apache.qpid.server.security.access.plugins=0.0.0 +org.apache.qpid.server.security.auth=0.0.0 +org.apache.qpid.server.security.auth.sasl=0.0.0 +org.apache.qpid.server.security.auth.manager=0.0.0 +org.apache.qpid.server.virtualhost=0.0.0 +org.apache.qpid.server.virtualhost.plugins=0.0.0 +org.apache.qpid.util=0.0.0 + diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java index c8a7b56ccb..43c4fa26b7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java @@ -30,18 +30,22 @@ import static org.osgi.framework.Constants.FRAMEWORK_STORAGE_CLEAN_ONFIRSTINIT; import static org.osgi.framework.Constants.FRAMEWORK_SYSTEMPACKAGES; import java.io.File; +import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.IdentityHashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import org.apache.commons.configuration.ConfigurationException; import org.apache.felix.framework.Felix; import org.apache.felix.framework.util.StringMap; import org.apache.log4j.Logger; import org.apache.qpid.common.Closeable; +import org.apache.qpid.common.QpidProperties; import org.apache.qpid.server.configuration.TopicConfiguration; import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionConfiguration.SlowConsumerDetectionConfigurationFactory; @@ -59,8 +63,11 @@ import org.apache.qpid.server.virtualhost.plugins.SlowConsumerDetection; import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory; import org.apache.qpid.server.virtualhost.plugins.policies.TopicDeletePolicy; import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory; +import org.apache.qpid.util.FileUtils; import org.osgi.framework.BundleActivator; +import org.osgi.framework.BundleContext; import org.osgi.framework.BundleException; +import org.osgi.framework.Version; import org.osgi.framework.launch.Framework; import org.osgi.util.tracker.ServiceTracker; @@ -73,7 +80,6 @@ public class PluginManager implements Closeable private static final Logger _logger = Logger.getLogger(PluginManager.class); private static final int FELIX_STOP_TIMEOUT = 30000; - private static final String QPID_VER_SUFFIX = "version=0.13,"; private Framework _felix; @@ -92,7 +98,50 @@ public class PluginManager implements Closeable private Map<String, SlowConsumerPolicyPluginFactory> _policyPlugins = new HashMap<String, SlowConsumerPolicyPluginFactory>(); private Map<String, AuthenticationManagerPluginFactory<? extends Plugin>> _authenticationManagerPlugins = new HashMap<String, AuthenticationManagerPluginFactory<? extends Plugin>>(); - public PluginManager(String pluginPath, String cachePath) throws Exception + /** The default name of the OSGI system package list. */ + private static final String DEFAULT_RESOURCE_NAME = "org/apache/qpid/server/plugins/OsgiSystemPackages.properties"; + + /** The name of the override system property that holds the name of the OSGI system package list. */ + private static final String FILE_PROPERTY = "qpid.osgisystempackages.properties"; + + private static final String OSGI_SYSTEM_PACKAGES; + + static + { + final String filename = System.getProperty(FILE_PROPERTY); + final InputStream is = FileUtils.openFileOrDefaultResource(filename, DEFAULT_RESOURCE_NAME, + PluginManager.class.getClassLoader()); + + try + { + Version qpidReleaseVersion; + try + { + qpidReleaseVersion = Version.parseVersion(QpidProperties.getReleaseVersion()); + } + catch (IllegalArgumentException iae) + { + qpidReleaseVersion = null; + } + + final Properties p = new Properties(); + p.load(is); + + final OsgiSystemPackageUtil osgiSystemPackageUtil = new OsgiSystemPackageUtil(qpidReleaseVersion, (Map)p); + + OSGI_SYSTEM_PACKAGES = osgiSystemPackageUtil.getFormattedSystemPackageString(); + + _logger.debug("List of OSGi system packages to be added: " + OSGI_SYSTEM_PACKAGES); + } + catch (IOException e) + { + _logger.error("Error reading OSGI system package list", e); + throw new ExceptionInInitializerError(e); + } + } + + + public PluginManager(String pluginPath, String cachePath, BundleContext bundleContext) throws Exception { // Store all non-OSGi plugins // A little gross that we have to add them here, but not all the plugins are OSGIfied @@ -131,127 +180,97 @@ public class PluginManager implements Closeable _authenticationManagerPlugins.put(pluginFactory.getPluginName(), pluginFactory); } - // Check the plugin directory path is set and exist - if (pluginPath == null) - { - return; - } - File pluginDir = new File(pluginPath); - if (!pluginDir.exists()) + if(bundleContext == null) { - return; - } - - // Setup OSGi configuration propery map - StringMap configMap = new StringMap(false); - - // Add the bundle provided service interface package and the core OSGi - // packages to be exported from the class path via the system bundle. - configMap.put(FRAMEWORK_SYSTEMPACKAGES, - "org.osgi.framework; version=1.3.0," + - "org.osgi.service.packageadmin; version=1.2.0," + - "org.osgi.service.startlevel; version=1.0.0," + - "org.osgi.service.url; version=1.0.0," + - "org.osgi.util.tracker; version=1.0.0," + - "org.apache.qpid.junit.extensions.util; " + QPID_VER_SUFFIX + - "org.apache.qpid; " + QPID_VER_SUFFIX + - "org.apache.qpid.common; " + QPID_VER_SUFFIX + - "org.apache.qpid.exchange; " + QPID_VER_SUFFIX + - "org.apache.qpid.framing; " + QPID_VER_SUFFIX + - "org.apache.qpid.management.common.mbeans.annotations; " + QPID_VER_SUFFIX + - "org.apache.qpid.protocol; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.binding; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.configuration; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.configuration.plugins; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.configuration.management; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.exchange; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.logging; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.logging.actors; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.logging.subjects; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.management; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.persistent; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.plugins; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.protocol; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.queue; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.registry; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.security; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.security.access; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.security.access.plugins; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.virtualhost; " + QPID_VER_SUFFIX + - "org.apache.qpid.server.virtualhost.plugins; " + QPID_VER_SUFFIX + - "org.apache.qpid.util; " + QPID_VER_SUFFIX + - "org.apache.commons.configuration; version=1.0.0," + - "org.apache.commons.lang; version=1.0.0," + - "org.apache.commons.lang.builder; version=1.0.0," + - "org.apache.commons.logging; version=1.0.0," + - "org.apache.log4j; version=1.2.12," + - "javax.management.openmbean; version=1.0.0," + - "javax.management; version=1.0.0," + - "javax.security.auth; version=1.0.0" - ); - - // No automatic shutdown hook - configMap.put("felix.shutdown.hook", "false"); - - // Add system activator - List<BundleActivator> activators = new ArrayList<BundleActivator>(); - _activator = new Activator(); - activators.add(_activator); - configMap.put(SYSTEMBUNDLE_ACTIVATORS_PROP, activators); + // Check the plugin directory path is set and exist + if (pluginPath == null) + { + _logger.info("No plugin path specified, no plugins will be loaded."); + return; + } + File pluginDir = new File(pluginPath); + if (!pluginDir.exists()) + { + _logger.warn("Plugin dir : " + pluginDir + " does not exist."); + return; + } - if (cachePath != null) - { - File cacheDir = new File(cachePath); - if (!cacheDir.exists() && cacheDir.canWrite()) + // Add the bundle provided service interface package and the core OSGi + // packages to be exported from the class path via the system bundle. + + // Setup OSGi configuration property map + final StringMap configMap = new StringMap(false); + configMap.put(FRAMEWORK_SYSTEMPACKAGES, OSGI_SYSTEM_PACKAGES); + + // No automatic shutdown hook + configMap.put("felix.shutdown.hook", "false"); + + // Add system activator + List<BundleActivator> activators = new ArrayList<BundleActivator>(); + _activator = new Activator(); + activators.add(_activator); + configMap.put(SYSTEMBUNDLE_ACTIVATORS_PROP, activators); + + if (cachePath != null) { - _logger.info("Creating plugin cache directory: " + cachePath); - cacheDir.mkdir(); + File cacheDir = new File(cachePath); + if (!cacheDir.exists() && cacheDir.canWrite()) + { + _logger.info("Creating plugin cache directory: " + cachePath); + cacheDir.mkdir(); + } + + // Set plugin cache directory and empty it + _logger.info("Cache bundles in directory " + cachePath); + configMap.put(FRAMEWORK_STORAGE, cachePath); } - - // Set plugin cache directory and empty it - _logger.info("Cache bundles in directory " + cachePath); - configMap.put(FRAMEWORK_STORAGE, cachePath); - } - configMap.put(FRAMEWORK_STORAGE_CLEAN, FRAMEWORK_STORAGE_CLEAN_ONFIRSTINIT); - - // Set directory with plugins to auto-deploy - _logger.info("Auto deploying bundles from directory " + pluginPath); - configMap.put(AUTO_DEPLOY_DIR_PROPERY, pluginPath); - configMap.put(AUTO_DEPLOY_ACTION_PROPERY, AUTO_DEPLOY_INSTALL_VALUE + "," + AUTO_DEPLOY_START_VALUE); - - // Start plugin manager and trackers - _felix = new Felix(configMap); - try - { - _logger.info("Starting plugin manager..."); - _felix.init(); - process(configMap, _felix.getBundleContext()); - _felix.start(); - _logger.info("Started plugin manager"); + configMap.put(FRAMEWORK_STORAGE_CLEAN, FRAMEWORK_STORAGE_CLEAN_ONFIRSTINIT); + + // Set directory with plugins to auto-deploy + _logger.info("Auto deploying bundles from directory " + pluginPath); + configMap.put(AUTO_DEPLOY_DIR_PROPERY, pluginPath); + configMap.put(AUTO_DEPLOY_ACTION_PROPERY, AUTO_DEPLOY_INSTALL_VALUE + "," + AUTO_DEPLOY_START_VALUE); + + // Start plugin manager + _felix = new Felix(configMap); + try + { + _logger.info("Starting plugin manager framework"); + _felix.init(); + process(configMap, _felix.getBundleContext()); + _felix.start(); + _logger.info("Started plugin manager framework"); + } + catch (BundleException e) + { + throw new ConfigurationException("Could not start plugin manager: " + e.getMessage(), e); + } + + bundleContext = _activator.getContext(); } - catch (BundleException e) + else { - throw new ConfigurationException("Could not start plugin manager: " + e.getMessage(), e); + _logger.info("Using the specified external BundleContext"); } - + // TODO save trackers in a map, keyed by class name - _exchangeTracker = new ServiceTracker(_activator.getContext(), ExchangeType.class.getName(), null); + _exchangeTracker = new ServiceTracker(bundleContext, ExchangeType.class.getName(), null); _exchangeTracker.open(); - _securityTracker = new ServiceTracker(_activator.getContext(), SecurityPluginFactory.class.getName(), null); + _securityTracker = new ServiceTracker(bundleContext, SecurityPluginFactory.class.getName(), null); _securityTracker.open(); - _configTracker = new ServiceTracker(_activator.getContext(), ConfigurationPluginFactory.class.getName(), null); + _configTracker = new ServiceTracker(bundleContext, ConfigurationPluginFactory.class.getName(), null); _configTracker.open(); - _virtualHostTracker = new ServiceTracker(_activator.getContext(), VirtualHostPluginFactory.class.getName(), null); + _virtualHostTracker = new ServiceTracker(bundleContext, VirtualHostPluginFactory.class.getName(), null); _virtualHostTracker.open(); - _policyTracker = new ServiceTracker(_activator.getContext(), SlowConsumerPolicyPluginFactory.class.getName(), null); + _policyTracker = new ServiceTracker(bundleContext, SlowConsumerPolicyPluginFactory.class.getName(), null); _policyTracker.open(); - _authenticationManagerTracker = new ServiceTracker(_activator.getContext(), AuthenticationManagerPluginFactory.class.getName(), null); + _authenticationManagerTracker = new ServiceTracker(bundleContext, AuthenticationManagerPluginFactory.class.getName(), null); _authenticationManagerTracker.open(); _logger.info("Opened service trackers"); @@ -331,21 +350,21 @@ public class PluginManager implements Closeable public void close() { - if (_felix != null) + try { - try - { - // Close all bundle trackers - _exchangeTracker.close(); - _securityTracker.close(); - _configTracker.close(); - _virtualHostTracker.close(); - _policyTracker.close(); - _authenticationManagerTracker.close(); - } - finally + // Close all bundle trackers + _exchangeTracker.close(); + _securityTracker.close(); + _configTracker.close(); + _virtualHostTracker.close(); + _policyTracker.close(); + _authenticationManagerTracker.close(); + } + finally + { + if (_felix != null) { - _logger.info("Stopping plugin manager"); + _logger.info("Stopping plugin manager framework"); try { // FIXME should be stopAndWait() but hangs VM, need upgrade in felix @@ -364,7 +383,12 @@ public class PluginManager implements Closeable { // Ignore } - _logger.info("Stopped plugin manager"); + _logger.info("Stopped plugin manager framework"); + } + else + { + _logger.info("Plugin manager was started with an external BundleContext, " + + "skipping remaining shutdown tasks"); } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java index 460ea93509..8a7159bdc2 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.protocol; -import java.util.EnumSet; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -32,23 +31,12 @@ import org.apache.qpid.transport.network.NetworkConnection; public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory { - private static final Set<AmqpProtocolVersion> ALL_VERSIONS = EnumSet.allOf(AmqpProtocolVersion.class); private static final AtomicLong ID_GENERATOR = new AtomicLong(0); private final IApplicationRegistry _appRegistry; private final String _fqdn; private final Set<AmqpProtocolVersion> _supported; - public MultiVersionProtocolEngineFactory() - { - this("localhost", ALL_VERSIONS); - } - - public MultiVersionProtocolEngineFactory(String fqdn) - { - this(fqdn, ALL_VERSIONS); - } - public MultiVersionProtocolEngineFactory(String fqdn, Set<AmqpProtocolVersion> supportedVersions) { _appRegistry = ApplicationRegistry.getInstance(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index bfdb30764a..c07074f69c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -65,6 +65,7 @@ import org.apache.qpid.server.transport.QpidAcceptor; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.osgi.framework.BundleContext; /** @@ -111,6 +112,8 @@ public abstract class ApplicationRegistry implements IApplicationRegistry private boolean _statisticsEnabled = false; private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; + private BundleContext _bundleContext; + static { Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownService())); @@ -209,7 +212,13 @@ public abstract class ApplicationRegistry implements IApplicationRegistry protected ApplicationRegistry(ServerConfiguration configuration) { + this(configuration, null); + } + + protected ApplicationRegistry(ServerConfiguration configuration, BundleContext bundleContext) + { _configuration = configuration; + _bundleContext = bundleContext; } public void configure() throws ConfigurationException @@ -218,7 +227,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry try { - _pluginManager = new PluginManager(_configuration.getPluginDirectory(), _configuration.getCacheDirectory()); + _pluginManager = new PluginManager(_configuration.getPluginDirectory(), _configuration.getCacheDirectory(), _bundleContext); } catch (Exception e) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java index ff2a8c959b..9121f8f927 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java @@ -29,12 +29,18 @@ import org.apache.qpid.server.logging.actors.BrokerActor; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.management.JMXManagedObjectRegistry; import org.apache.qpid.server.management.NoopManagedObjectRegistry; +import org.osgi.framework.BundleContext; public class ConfigurationFileApplicationRegistry extends ApplicationRegistry { public ConfigurationFileApplicationRegistry(File configurationURL) throws ConfigurationException { - super(new ServerConfiguration(configurationURL)); + this(configurationURL, null); + } + + public ConfigurationFileApplicationRegistry(File configurationURL, BundleContext bundleContext) throws ConfigurationException + { + super(new ServerConfiguration(configurationURL), bundleContext); } @Override diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/JCAProvider.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/JCAProvider.java index d6a09d8217..d6f6c714e2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/JCAProvider.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/JCAProvider.java @@ -21,12 +21,11 @@ package org.apache.qpid.server.security.auth.sasl; import java.security.Provider; -import java.security.Security; import java.util.Map; import javax.security.sasl.SaslServerFactory; -public final class JCAProvider extends Provider +public class JCAProvider extends Provider { public JCAProvider(String name, Map<String, Class<? extends SaslServerFactory>> providerMap) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/signal/SignalHandlerTask.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/signal/SignalHandlerTask.java new file mode 100644 index 0000000000..4e3fae1dbd --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/signal/SignalHandlerTask.java @@ -0,0 +1,89 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.signal; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; + +import org.apache.log4j.Logger; + +public abstract class SignalHandlerTask +{ + private static final Logger LOGGER = Logger.getLogger(SignalHandlerTask.class); + + private static final String HANDLE_METHOD = "handle"; + private static final String SUN_MISC_SIGNAL_CLASS = "sun.misc.Signal"; + private static final String SUN_MISC_SIGNAL_HANDLER_CLASS = "sun.misc.SignalHandler"; + + public boolean register(final String signalName) + { + try + { + //try to load the signal handling classes + Class<?> signalClazz = Class.forName(SUN_MISC_SIGNAL_CLASS); + Class<?> handlerClazz = Class.forName(SUN_MISC_SIGNAL_HANDLER_CLASS); + + //create an InvocationHandler that just executes the SignalHandlerTask + InvocationHandler invoker = new InvocationHandler() + { + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable + { + handle(); + + return null; + } + }; + + //create a dynamic proxy implementing SignalHandler + Object handler = Proxy.newProxyInstance(handlerClazz.getClassLoader(), new Class[]{handlerClazz}, invoker); + + //create the Signal to handle + Constructor<?> signalConstructor = signalClazz.getConstructor(String.class); + Object signal = signalConstructor.newInstance(signalName); + + //invoke the Signal.handle(signal, handler) method + Method handleMethod = signalClazz.getMethod(HANDLE_METHOD, signalClazz, handlerClazz); + handleMethod.invoke(null, signal, handler); + } + catch (Exception e) + { + LOGGER.debug("Unable to register handler for Signal " + signalName + " due to exception: " + e, e); + return false; + } + + return true; + } + + public abstract void handle(); + + public static String getPlatformDescription() + { + String name = System.getProperty("os.name"); + String osVer = System.getProperty("os.version"); + String jvmVendor = System.getProperty("java.vm.vendor"); + String jvmName = System.getProperty("java.vm.name"); + String javaRuntimeVer = System.getProperty("java.runtime.version"); + + return "OS: " + name + " " + osVer + ", JVM:" + jvmVendor + " " + jvmName + " " + javaRuntimeVer; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index 0f6480253c..0921decda4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -109,15 +109,15 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr private final MessageAcquireMode _acquireMode; private MessageFlowMode _flowMode; private final ServerSession _session; - private AtomicBoolean _stopped = new AtomicBoolean(true); + private final AtomicBoolean _stopped = new AtomicBoolean(true); private static final Struct[] EMPTY_STRUCT_ARRAY = new Struct[0]; private LogActor _logActor; - private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>(); + private final Map<String, Object> _properties = new ConcurrentHashMap<String, Object>(); private UUID _id; private String _traceExclude; private String _trace; - private long _createTime = System.currentTimeMillis(); + private final long _createTime = System.currentTimeMillis(); private final AtomicLong _deliveredCount = new AtomicLong(0); private final Map<String, Object> _arguments; @@ -711,13 +711,22 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr public void stop() { - if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) + try { - _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); + getSendLock(); + + if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) + { + _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); + } + _stopped.set(true); + FlowCreditManager_0_10 creditManager = getCreditManager(); + creditManager.clearCredit(); + } + finally + { + releaseSendLock(); } - _stopped.set(true); - FlowCreditManager_0_10 creditManager = getCreditManager(); - creditManager.clearCredit(); } public void addCredit(MessageCreditUnit unit, long value) |