summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2011-08-22 20:14:35 +0000
committerRobert Godfrey <rgodfrey@apache.org>2011-08-22 20:14:35 +0000
commit80f7040fb49757f80b9450bf497cbb1ff4f4f154 (patch)
treebf5c72e73579d6df3116ad13fa243730ed36a7fb /qpid/java/broker/src/main/java/org
parent9d88761d6711f7f8722091fcb8849c0e89a8b8c9 (diff)
downloadqpid-python-80f7040fb49757f80b9450bf497cbb1ff4f4f154.tar.gz
NO-JIRA : Merge to 1-0 sandbox branch from current HEAD - r1160304
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rg-amqp-1-0-sandbox@1160414 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/main/java/org')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java88
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java14
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java55
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/OsgiSystemPackageUtil.java91
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/OsgiSystemPackages.properties93
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java260
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java12
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java11
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/JCAProvider.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/signal/SignalHandlerTask.java89
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java25
12 files changed, 537 insertions, 212 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java
index cdc022ddac..e29b07b3e7 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java
@@ -34,6 +34,8 @@ import java.util.Properties;
import java.util.Set;
import java.util.logging.*;
+import javax.net.ssl.SSLContext;
+
import org.apache.log4j.PropertyConfigurator;
import org.apache.log4j.xml.QpidLog4JConfigurator;
import org.apache.qpid.server.configuration.ServerConfiguration;
@@ -46,8 +48,8 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.management.LoggingManagementMBean;
import org.apache.qpid.server.logging.messages.BrokerMessages;
-import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
import org.apache.qpid.server.protocol.AmqpProtocolVersion;
+import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
import org.apache.qpid.server.transport.QpidAcceptor;
@@ -115,7 +117,7 @@ public class Broker
configureLogging(logConfigFile, options.getLogWatchFrequency());
- ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile);
+ ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile, options.getBundleContext());
ServerConfiguration serverConfig = config.getConfiguration();
updateManagementPort(serverConfig, options.getJmxPort());
@@ -199,57 +201,41 @@ public class Broker
{
for(int port : ports)
{
- Set<AmqpProtocolVersion> supported = EnumSet.allOf(AmqpProtocolVersion.class);
-
- if(exclude_0_10.contains(port))
- {
- supported.remove(AmqpProtocolVersion.v0_10);
- }
-
- if(exclude_0_9_1.contains(port))
- {
- supported.remove(AmqpProtocolVersion.v0_9_1);
- }
- if(exclude_0_9.contains(port))
- {
- supported.remove(AmqpProtocolVersion.v0_9);
- }
- if(exclude_0_8.contains(port))
- {
- supported.remove(AmqpProtocolVersion.v0_8);
- }
-
- NetworkTransportConfiguration settings =
- new ServerNetworkTransportConfiguration(serverConfig, port, bindAddress.getHostName(), Transport.TCP);
-
- IncomingNetworkTransport transport = Transport.getIncomingTransportInstance();
- MultiVersionProtocolEngineFactory protocolEngineFactory =
- new MultiVersionProtocolEngineFactory(hostName, supported);
+ final Set<AmqpProtocolVersion> supported =
+ getSupportedVersions(port, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8);
+ final NetworkTransportConfiguration settings =
+ new ServerNetworkTransportConfiguration(serverConfig, port, bindAddress.getHostName(), Transport.TCP);
+
+ final IncomingNetworkTransport transport = Transport.getIncomingTransportInstance();
+ final MultiVersionProtocolEngineFactory protocolEngineFactory =
+ new MultiVersionProtocolEngineFactory(hostName, supported);
transport.accept(settings, protocolEngineFactory, null);
ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port),
- new QpidAcceptor(transport,"TCP"));
+ new QpidAcceptor(transport,"TCP"));
CurrentActor.get().message(BrokerMessages.LISTENING("TCP", port));
}
}
if (serverConfig.getEnableSSL())
{
- String keystorePath = serverConfig.getKeystorePath();
- String keystorePassword = serverConfig.getKeystorePassword();
- String certType = serverConfig.getCertType();
- SSLContextFactory sslFactory =
- new SSLContextFactory(keystorePath, keystorePassword, certType);
+ final String keystorePath = serverConfig.getKeystorePath();
+ final String keystorePassword = serverConfig.getKeystorePassword();
+ final String certType = serverConfig.getCertType();
+ final SSLContext sslContext = SSLContextFactory.buildServerContext(keystorePath, keystorePassword, certType);
for(int sslPort : sslPorts)
{
- NetworkTransportConfiguration settings =
+ final Set<AmqpProtocolVersion> supported =
+ getSupportedVersions(sslPort, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8);
+ final NetworkTransportConfiguration settings =
new ServerNetworkTransportConfiguration(serverConfig, sslPort, bindAddress.getHostName(), Transport.TCP);
- IncomingNetworkTransport transport = new MinaNetworkTransport();
-
- transport.accept(settings, new MultiVersionProtocolEngineFactory(), sslFactory);
+ final IncomingNetworkTransport transport = new MinaNetworkTransport();
+ final MultiVersionProtocolEngineFactory protocolEngineFactory =
+ new MultiVersionProtocolEngineFactory(hostName, supported);
+ transport.accept(settings, protocolEngineFactory, sslContext);
ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, sslPort),
new QpidAcceptor(transport,"TCP"));
CurrentActor.get().message(BrokerMessages.LISTENING("TCP/SSL", sslPort));
@@ -265,6 +251,32 @@ public class Broker
}
}
+ private static Set<AmqpProtocolVersion> getSupportedVersions(final int port, final Set<Integer> exclude_0_10,
+ final Set<Integer> exclude_0_9_1, final Set<Integer> exclude_0_9,
+ final Set<Integer> exclude_0_8)
+ {
+ final EnumSet<AmqpProtocolVersion> supported = EnumSet.allOf(AmqpProtocolVersion.class);
+
+ if(exclude_0_10.contains(port))
+ {
+ supported.remove(AmqpProtocolVersion.v0_10);
+ }
+ if(exclude_0_9_1.contains(port))
+ {
+ supported.remove(AmqpProtocolVersion.v0_9_1);
+ }
+ if(exclude_0_9.contains(port))
+ {
+ supported.remove(AmqpProtocolVersion.v0_9);
+ }
+ if(exclude_0_8.contains(port))
+ {
+ supported.remove(AmqpProtocolVersion.v0_8);
+ }
+
+ return supported;
+ }
+
private File getConfigFile(final String fileName,
final String defaultFileName,
final String qpidHome, boolean throwOnFileNotFound) throws InitException
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java
index b83da92660..b2d029eb95 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java
@@ -26,6 +26,8 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import org.osgi.framework.BundleContext;
+
public class BrokerOptions
{
/** serialVersionUID */
@@ -51,9 +53,11 @@ public class BrokerOptions
private String _logConfigFile;
private String _bind;
private Integer _jmxPort;
+ private BundleContext _bundleContext;
private Integer _logWatchFrequency = 0;
+
public void addPort(final int port)
{
_ports.add(port);
@@ -149,4 +153,14 @@ public class BrokerOptions
{
_logWatchFrequency = logWatchFrequency;
}
+
+ public BundleContext getBundleContext()
+ {
+ return _bundleContext ;
+ }
+
+ public void setBundleContext(final BundleContext bundleContext)
+ {
+ _bundleContext = bundleContext;
+ }
} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
index 0621b87f0a..02f8a346cf 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
@@ -39,17 +39,13 @@ import org.apache.commons.configuration.HierarchicalConfiguration;
import org.apache.commons.configuration.SystemConfiguration;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean;
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.signal.SignalHandlerTask;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-import org.apache.qpid.transport.NetworkTransportConfiguration;
-import sun.misc.Signal;
-import sun.misc.SignalHandler;
-
-public class ServerConfiguration extends ConfigurationPlugin implements SignalHandler
+public class ServerConfiguration extends ConfigurationPlugin
{
protected static final Logger _logger = Logger.getLogger(ServerConfiguration.class);
@@ -60,7 +56,7 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa
public static final int DEFAULT_FRAME_SIZE = 65536;
public static final int DEFAULT_PORT = 5672;
- public static final int DEFAULT_SSL_PORT = 8672;
+ public static final int DEFAULT_SSL_PORT = 5671;
public static final long DEFAULT_HOUSEKEEPING_PERIOD = 30000L;
public static final int DEFAULT_JMXPORT = 8999;
@@ -75,10 +71,6 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa
private File _configFile;
private File _vhostsFile;
- private Logger _log = Logger.getLogger(this.getClass());
-
- private ConfigurationManagementMBean _mbean;
-
// Map of environment variables to config items
private static final Map<String, String> envVarMap = new HashMap<String, String>();
@@ -137,15 +129,26 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa
{
this(parseConfig(configurationURL));
_configFile = configurationURL;
- try
+
+ SignalHandlerTask hupReparseTask = new SignalHandlerTask()
{
- Signal sig = new sun.misc.Signal("HUP");
- sun.misc.Signal.handle(sig, this);
- }
- catch (Exception e)
+ public void handle()
+ {
+ try
+ {
+ reparseConfigFileSecuritySections();
+ }
+ catch (ConfigurationException e)
+ {
+ _logger.error("Could not reload configuration file security sections", e);
+ }
+ }
+ };
+
+ if(!hupReparseTask.register("HUP"))
{
- _logger.info("Signal HUP not supported for OS: " + System.getProperty("os.name"));
- // We're on something that doesn't handle SIGHUP, how sad, Windows.
+ _logger.info("Unable to register Signal HUP handler to reload security configuration.");
+ _logger.info("Signal HUP not supported for this OS / JVM combination - " + SignalHandlerTask.getPlatformDescription());
}
}
@@ -416,18 +419,6 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa
return _configFile == null ? "" : _configFile.getAbsolutePath();
}
- public void handle(Signal arg0)
- {
- try
- {
- reparseConfigFileSecuritySections();
- }
- catch (ConfigurationException e)
- {
- _logger.error("Could not reload configuration file security sections", e);
- }
- }
-
public void reparseConfigFileSecuritySections() throws ConfigurationException
{
if (_configFile != null)
@@ -688,12 +679,12 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa
public String getKeystorePath()
{
- return getStringValue("connector.ssl.keystorePath", "none");
+ return getStringValue("connector.ssl.keystorePath");
}
public String getKeystorePassword()
{
- return getStringValue("connector.ssl.keystorePassword", "none");
+ return getStringValue("connector.ssl.keystorePassword");
}
public String getCertType()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/OsgiSystemPackageUtil.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/OsgiSystemPackageUtil.java
new file mode 100644
index 0000000000..644f714c8c
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/OsgiSystemPackageUtil.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.plugins;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.osgi.framework.Version;
+
+/**
+ * Utility class to convert a map of package name to version numbers into the string
+ * with the format expected of a OSGi system package declaration:
+ *
+ * <code>
+ * org.xyz; version=1.0.0, org.xyz.xyz; version=1.0.0,...
+ * </code>
+ *
+ * Additionally, if the caller has provided a qpidPackageReleaseNumber and the package
+ * begins org.apache.qpid, this release number will be used, in preference to the one
+ * found in the Map.
+ *
+ * @see org.osgi.framework.Constants#FRAMEWORK_SYSTEMPACKAGES
+ *
+ */
+public class OsgiSystemPackageUtil
+{
+ private static final String APACHE_QPID_PKG_PREFIX = "org.apache.qpid";
+
+ private final Map<String, String> _packageNameVersionMap;
+ private final Version _qpidPackageReleaseNumber;
+
+ public OsgiSystemPackageUtil(final Version qpidPackageReleaseNumber, final Map<String, String> packageNameVersionMap)
+ {
+ _qpidPackageReleaseNumber = qpidPackageReleaseNumber;
+ _packageNameVersionMap = packageNameVersionMap;
+ }
+
+ public String getFormattedSystemPackageString()
+ {
+ if (_packageNameVersionMap == null || _packageNameVersionMap.size() == 0)
+ {
+ return null;
+ }
+
+ final StringBuilder packages = new StringBuilder();
+
+ for(Iterator<String> itr = _packageNameVersionMap.keySet().iterator(); itr.hasNext();)
+ {
+ final String packageName = itr.next();
+ final String packageVersion;
+
+ if (_qpidPackageReleaseNumber != null && packageName.startsWith(APACHE_QPID_PKG_PREFIX))
+ {
+ packageVersion = _qpidPackageReleaseNumber.toString();
+ }
+ else
+ {
+ packageVersion = _packageNameVersionMap.get(packageName);
+ }
+
+ packages.append(packageName);
+ packages.append("; ");
+ packages.append("version=");
+ packages.append(packageVersion);
+
+ if (itr.hasNext())
+ {
+ packages.append(", ");
+ }
+ }
+
+ return packages.toString();
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/OsgiSystemPackages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/OsgiSystemPackages.properties
new file mode 100644
index 0000000000..aaab4f76cc
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/OsgiSystemPackages.properties
@@ -0,0 +1,93 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one#
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#
+# OSGi framework system package list
+#
+# PluginManager uses these properties to construct the FRAMEWORK_SYSTEMPACKAGES list
+#
+
+# Format is:
+# <package>=<version>
+# and PluginManager will convert this into:
+# <package>; version=<version>
+# e.g. org.osgi.framework; version=1.3.0
+
+javax.management.openmbean=1.0.0
+javax.management=1.0.0
+
+javax.security.auth=1.0.0
+javax.security.auth.callback=1.0.0
+javax.security.sasl=1.0.0
+javax.security=1.0.0
+
+org.xml.sax=1.0.0
+org.xml.sax.helpers=1.0.0
+
+org.osgi.framework=1.3.0
+org.osgi.service.packageadmin=1.2.0
+org.osgi.service.startlevel=1.0.0
+org.osgi.service.url=1.0.0
+org.osgi.util.tracker=1.0.0
+
+org.apache.commons.configuration=1.0.0
+
+org.apache.commons.lang=1.0.0
+org.apache.commons.lang.builder=1.0.0
+org.apache.commons.logging=1.0.0
+
+org.apache.log4j=1.2.12
+
+org.slf4j=1.6.1
+
+# For Qpid packages (org.apache.qpid), the version number is automatically overridden by QpidPropertis#getReleaseVersion()
+
+org.apache.qpid.junit.extensions.util=0.0.0
+org.apache.qpid=0.0.0
+org.apache.qpid.common=0.0.0
+org.apache.qpid.exchange=0.0.0
+org.apache.qpid.framing=0.0.0
+org.apache.qpid.management.common.mbeans.annotations=0.0.0
+org.apache.qpid.protocol=0.0.0
+org.apache.qpid.transport=0.0.0
+org.apache.qpid.transport.codec=0.0.0
+org.apache.qpid.server.binding=0.0.0
+org.apache.qpid.server.configuration=0.0.0
+org.apache.qpid.server.configuration.plugins=0.0.0
+org.apache.qpid.server.configuration.management=0.0.0
+org.apache.qpid.server.exchange=0.0.0
+org.apache.qpid.server.logging=0.0.0
+org.apache.qpid.server.logging.actors=0.0.0
+org.apache.qpid.server.logging.subjects=0.0.0
+org.apache.qpid.server.management=0.0.0
+org.apache.qpid.server.persistent=0.0.0
+org.apache.qpid.server.plugins=0.0.0
+org.apache.qpid.server.protocol=0.0.0
+org.apache.qpid.server.queue=0.0.0
+org.apache.qpid.server.registry=0.0.0
+org.apache.qpid.server.security=0.0.0
+org.apache.qpid.server.security.access=0.0.0
+org.apache.qpid.server.security.access.plugins=0.0.0
+org.apache.qpid.server.security.auth=0.0.0
+org.apache.qpid.server.security.auth.sasl=0.0.0
+org.apache.qpid.server.security.auth.manager=0.0.0
+org.apache.qpid.server.virtualhost=0.0.0
+org.apache.qpid.server.virtualhost.plugins=0.0.0
+org.apache.qpid.util=0.0.0
+
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java
index c8a7b56ccb..43c4fa26b7 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java
@@ -30,18 +30,22 @@ import static org.osgi.framework.Constants.FRAMEWORK_STORAGE_CLEAN_ONFIRSTINIT;
import static org.osgi.framework.Constants.FRAMEWORK_SYSTEMPACKAGES;
import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.felix.framework.Felix;
import org.apache.felix.framework.util.StringMap;
import org.apache.log4j.Logger;
import org.apache.qpid.common.Closeable;
+import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.server.configuration.TopicConfiguration;
import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionConfiguration.SlowConsumerDetectionConfigurationFactory;
@@ -59,8 +63,11 @@ import org.apache.qpid.server.virtualhost.plugins.SlowConsumerDetection;
import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory;
import org.apache.qpid.server.virtualhost.plugins.policies.TopicDeletePolicy;
import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory;
+import org.apache.qpid.util.FileUtils;
import org.osgi.framework.BundleActivator;
+import org.osgi.framework.BundleContext;
import org.osgi.framework.BundleException;
+import org.osgi.framework.Version;
import org.osgi.framework.launch.Framework;
import org.osgi.util.tracker.ServiceTracker;
@@ -73,7 +80,6 @@ public class PluginManager implements Closeable
private static final Logger _logger = Logger.getLogger(PluginManager.class);
private static final int FELIX_STOP_TIMEOUT = 30000;
- private static final String QPID_VER_SUFFIX = "version=0.13,";
private Framework _felix;
@@ -92,7 +98,50 @@ public class PluginManager implements Closeable
private Map<String, SlowConsumerPolicyPluginFactory> _policyPlugins = new HashMap<String, SlowConsumerPolicyPluginFactory>();
private Map<String, AuthenticationManagerPluginFactory<? extends Plugin>> _authenticationManagerPlugins = new HashMap<String, AuthenticationManagerPluginFactory<? extends Plugin>>();
- public PluginManager(String pluginPath, String cachePath) throws Exception
+ /** The default name of the OSGI system package list. */
+ private static final String DEFAULT_RESOURCE_NAME = "org/apache/qpid/server/plugins/OsgiSystemPackages.properties";
+
+ /** The name of the override system property that holds the name of the OSGI system package list. */
+ private static final String FILE_PROPERTY = "qpid.osgisystempackages.properties";
+
+ private static final String OSGI_SYSTEM_PACKAGES;
+
+ static
+ {
+ final String filename = System.getProperty(FILE_PROPERTY);
+ final InputStream is = FileUtils.openFileOrDefaultResource(filename, DEFAULT_RESOURCE_NAME,
+ PluginManager.class.getClassLoader());
+
+ try
+ {
+ Version qpidReleaseVersion;
+ try
+ {
+ qpidReleaseVersion = Version.parseVersion(QpidProperties.getReleaseVersion());
+ }
+ catch (IllegalArgumentException iae)
+ {
+ qpidReleaseVersion = null;
+ }
+
+ final Properties p = new Properties();
+ p.load(is);
+
+ final OsgiSystemPackageUtil osgiSystemPackageUtil = new OsgiSystemPackageUtil(qpidReleaseVersion, (Map)p);
+
+ OSGI_SYSTEM_PACKAGES = osgiSystemPackageUtil.getFormattedSystemPackageString();
+
+ _logger.debug("List of OSGi system packages to be added: " + OSGI_SYSTEM_PACKAGES);
+ }
+ catch (IOException e)
+ {
+ _logger.error("Error reading OSGI system package list", e);
+ throw new ExceptionInInitializerError(e);
+ }
+ }
+
+
+ public PluginManager(String pluginPath, String cachePath, BundleContext bundleContext) throws Exception
{
// Store all non-OSGi plugins
// A little gross that we have to add them here, but not all the plugins are OSGIfied
@@ -131,127 +180,97 @@ public class PluginManager implements Closeable
_authenticationManagerPlugins.put(pluginFactory.getPluginName(), pluginFactory);
}
- // Check the plugin directory path is set and exist
- if (pluginPath == null)
- {
- return;
- }
- File pluginDir = new File(pluginPath);
- if (!pluginDir.exists())
+ if(bundleContext == null)
{
- return;
- }
-
- // Setup OSGi configuration propery map
- StringMap configMap = new StringMap(false);
-
- // Add the bundle provided service interface package and the core OSGi
- // packages to be exported from the class path via the system bundle.
- configMap.put(FRAMEWORK_SYSTEMPACKAGES,
- "org.osgi.framework; version=1.3.0," +
- "org.osgi.service.packageadmin; version=1.2.0," +
- "org.osgi.service.startlevel; version=1.0.0," +
- "org.osgi.service.url; version=1.0.0," +
- "org.osgi.util.tracker; version=1.0.0," +
- "org.apache.qpid.junit.extensions.util; " + QPID_VER_SUFFIX +
- "org.apache.qpid; " + QPID_VER_SUFFIX +
- "org.apache.qpid.common; " + QPID_VER_SUFFIX +
- "org.apache.qpid.exchange; " + QPID_VER_SUFFIX +
- "org.apache.qpid.framing; " + QPID_VER_SUFFIX +
- "org.apache.qpid.management.common.mbeans.annotations; " + QPID_VER_SUFFIX +
- "org.apache.qpid.protocol; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.binding; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.configuration; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.configuration.plugins; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.configuration.management; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.exchange; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.logging; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.logging.actors; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.logging.subjects; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.management; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.persistent; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.plugins; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.protocol; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.queue; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.registry; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.security; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.security.access; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.security.access.plugins; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.virtualhost; " + QPID_VER_SUFFIX +
- "org.apache.qpid.server.virtualhost.plugins; " + QPID_VER_SUFFIX +
- "org.apache.qpid.util; " + QPID_VER_SUFFIX +
- "org.apache.commons.configuration; version=1.0.0," +
- "org.apache.commons.lang; version=1.0.0," +
- "org.apache.commons.lang.builder; version=1.0.0," +
- "org.apache.commons.logging; version=1.0.0," +
- "org.apache.log4j; version=1.2.12," +
- "javax.management.openmbean; version=1.0.0," +
- "javax.management; version=1.0.0," +
- "javax.security.auth; version=1.0.0"
- );
-
- // No automatic shutdown hook
- configMap.put("felix.shutdown.hook", "false");
-
- // Add system activator
- List<BundleActivator> activators = new ArrayList<BundleActivator>();
- _activator = new Activator();
- activators.add(_activator);
- configMap.put(SYSTEMBUNDLE_ACTIVATORS_PROP, activators);
+ // Check the plugin directory path is set and exist
+ if (pluginPath == null)
+ {
+ _logger.info("No plugin path specified, no plugins will be loaded.");
+ return;
+ }
+ File pluginDir = new File(pluginPath);
+ if (!pluginDir.exists())
+ {
+ _logger.warn("Plugin dir : " + pluginDir + " does not exist.");
+ return;
+ }
- if (cachePath != null)
- {
- File cacheDir = new File(cachePath);
- if (!cacheDir.exists() && cacheDir.canWrite())
+ // Add the bundle provided service interface package and the core OSGi
+ // packages to be exported from the class path via the system bundle.
+
+ // Setup OSGi configuration property map
+ final StringMap configMap = new StringMap(false);
+ configMap.put(FRAMEWORK_SYSTEMPACKAGES, OSGI_SYSTEM_PACKAGES);
+
+ // No automatic shutdown hook
+ configMap.put("felix.shutdown.hook", "false");
+
+ // Add system activator
+ List<BundleActivator> activators = new ArrayList<BundleActivator>();
+ _activator = new Activator();
+ activators.add(_activator);
+ configMap.put(SYSTEMBUNDLE_ACTIVATORS_PROP, activators);
+
+ if (cachePath != null)
{
- _logger.info("Creating plugin cache directory: " + cachePath);
- cacheDir.mkdir();
+ File cacheDir = new File(cachePath);
+ if (!cacheDir.exists() && cacheDir.canWrite())
+ {
+ _logger.info("Creating plugin cache directory: " + cachePath);
+ cacheDir.mkdir();
+ }
+
+ // Set plugin cache directory and empty it
+ _logger.info("Cache bundles in directory " + cachePath);
+ configMap.put(FRAMEWORK_STORAGE, cachePath);
}
-
- // Set plugin cache directory and empty it
- _logger.info("Cache bundles in directory " + cachePath);
- configMap.put(FRAMEWORK_STORAGE, cachePath);
- }
- configMap.put(FRAMEWORK_STORAGE_CLEAN, FRAMEWORK_STORAGE_CLEAN_ONFIRSTINIT);
-
- // Set directory with plugins to auto-deploy
- _logger.info("Auto deploying bundles from directory " + pluginPath);
- configMap.put(AUTO_DEPLOY_DIR_PROPERY, pluginPath);
- configMap.put(AUTO_DEPLOY_ACTION_PROPERY, AUTO_DEPLOY_INSTALL_VALUE + "," + AUTO_DEPLOY_START_VALUE);
-
- // Start plugin manager and trackers
- _felix = new Felix(configMap);
- try
- {
- _logger.info("Starting plugin manager...");
- _felix.init();
- process(configMap, _felix.getBundleContext());
- _felix.start();
- _logger.info("Started plugin manager");
+ configMap.put(FRAMEWORK_STORAGE_CLEAN, FRAMEWORK_STORAGE_CLEAN_ONFIRSTINIT);
+
+ // Set directory with plugins to auto-deploy
+ _logger.info("Auto deploying bundles from directory " + pluginPath);
+ configMap.put(AUTO_DEPLOY_DIR_PROPERY, pluginPath);
+ configMap.put(AUTO_DEPLOY_ACTION_PROPERY, AUTO_DEPLOY_INSTALL_VALUE + "," + AUTO_DEPLOY_START_VALUE);
+
+ // Start plugin manager
+ _felix = new Felix(configMap);
+ try
+ {
+ _logger.info("Starting plugin manager framework");
+ _felix.init();
+ process(configMap, _felix.getBundleContext());
+ _felix.start();
+ _logger.info("Started plugin manager framework");
+ }
+ catch (BundleException e)
+ {
+ throw new ConfigurationException("Could not start plugin manager: " + e.getMessage(), e);
+ }
+
+ bundleContext = _activator.getContext();
}
- catch (BundleException e)
+ else
{
- throw new ConfigurationException("Could not start plugin manager: " + e.getMessage(), e);
+ _logger.info("Using the specified external BundleContext");
}
-
+
// TODO save trackers in a map, keyed by class name
- _exchangeTracker = new ServiceTracker(_activator.getContext(), ExchangeType.class.getName(), null);
+ _exchangeTracker = new ServiceTracker(bundleContext, ExchangeType.class.getName(), null);
_exchangeTracker.open();
- _securityTracker = new ServiceTracker(_activator.getContext(), SecurityPluginFactory.class.getName(), null);
+ _securityTracker = new ServiceTracker(bundleContext, SecurityPluginFactory.class.getName(), null);
_securityTracker.open();
- _configTracker = new ServiceTracker(_activator.getContext(), ConfigurationPluginFactory.class.getName(), null);
+ _configTracker = new ServiceTracker(bundleContext, ConfigurationPluginFactory.class.getName(), null);
_configTracker.open();
- _virtualHostTracker = new ServiceTracker(_activator.getContext(), VirtualHostPluginFactory.class.getName(), null);
+ _virtualHostTracker = new ServiceTracker(bundleContext, VirtualHostPluginFactory.class.getName(), null);
_virtualHostTracker.open();
- _policyTracker = new ServiceTracker(_activator.getContext(), SlowConsumerPolicyPluginFactory.class.getName(), null);
+ _policyTracker = new ServiceTracker(bundleContext, SlowConsumerPolicyPluginFactory.class.getName(), null);
_policyTracker.open();
- _authenticationManagerTracker = new ServiceTracker(_activator.getContext(), AuthenticationManagerPluginFactory.class.getName(), null);
+ _authenticationManagerTracker = new ServiceTracker(bundleContext, AuthenticationManagerPluginFactory.class.getName(), null);
_authenticationManagerTracker.open();
_logger.info("Opened service trackers");
@@ -331,21 +350,21 @@ public class PluginManager implements Closeable
public void close()
{
- if (_felix != null)
+ try
{
- try
- {
- // Close all bundle trackers
- _exchangeTracker.close();
- _securityTracker.close();
- _configTracker.close();
- _virtualHostTracker.close();
- _policyTracker.close();
- _authenticationManagerTracker.close();
- }
- finally
+ // Close all bundle trackers
+ _exchangeTracker.close();
+ _securityTracker.close();
+ _configTracker.close();
+ _virtualHostTracker.close();
+ _policyTracker.close();
+ _authenticationManagerTracker.close();
+ }
+ finally
+ {
+ if (_felix != null)
{
- _logger.info("Stopping plugin manager");
+ _logger.info("Stopping plugin manager framework");
try
{
// FIXME should be stopAndWait() but hangs VM, need upgrade in felix
@@ -364,7 +383,12 @@ public class PluginManager implements Closeable
{
// Ignore
}
- _logger.info("Stopped plugin manager");
+ _logger.info("Stopped plugin manager framework");
+ }
+ else
+ {
+ _logger.info("Plugin manager was started with an external BundleContext, " +
+ "skipping remaining shutdown tasks");
}
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
index 460ea93509..8a7159bdc2 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.protocol;
-import java.util.EnumSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
@@ -32,23 +31,12 @@ import org.apache.qpid.transport.network.NetworkConnection;
public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
{
- private static final Set<AmqpProtocolVersion> ALL_VERSIONS = EnumSet.allOf(AmqpProtocolVersion.class);
private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
private final IApplicationRegistry _appRegistry;
private final String _fqdn;
private final Set<AmqpProtocolVersion> _supported;
- public MultiVersionProtocolEngineFactory()
- {
- this("localhost", ALL_VERSIONS);
- }
-
- public MultiVersionProtocolEngineFactory(String fqdn)
- {
- this(fqdn, ALL_VERSIONS);
- }
-
public MultiVersionProtocolEngineFactory(String fqdn, Set<AmqpProtocolVersion> supportedVersions)
{
_appRegistry = ApplicationRegistry.getInstance();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
index bfdb30764a..c07074f69c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
@@ -65,6 +65,7 @@ import org.apache.qpid.server.transport.QpidAcceptor;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.osgi.framework.BundleContext;
/**
@@ -111,6 +112,8 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
private boolean _statisticsEnabled = false;
private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+ private BundleContext _bundleContext;
+
static
{
Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownService()));
@@ -209,7 +212,13 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
protected ApplicationRegistry(ServerConfiguration configuration)
{
+ this(configuration, null);
+ }
+
+ protected ApplicationRegistry(ServerConfiguration configuration, BundleContext bundleContext)
+ {
_configuration = configuration;
+ _bundleContext = bundleContext;
}
public void configure() throws ConfigurationException
@@ -218,7 +227,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
try
{
- _pluginManager = new PluginManager(_configuration.getPluginDirectory(), _configuration.getCacheDirectory());
+ _pluginManager = new PluginManager(_configuration.getPluginDirectory(), _configuration.getCacheDirectory(), _bundleContext);
}
catch (Exception e)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
index ff2a8c959b..9121f8f927 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
@@ -29,12 +29,18 @@ import org.apache.qpid.server.logging.actors.BrokerActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.management.JMXManagedObjectRegistry;
import org.apache.qpid.server.management.NoopManagedObjectRegistry;
+import org.osgi.framework.BundleContext;
public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
{
public ConfigurationFileApplicationRegistry(File configurationURL) throws ConfigurationException
{
- super(new ServerConfiguration(configurationURL));
+ this(configurationURL, null);
+ }
+
+ public ConfigurationFileApplicationRegistry(File configurationURL, BundleContext bundleContext) throws ConfigurationException
+ {
+ super(new ServerConfiguration(configurationURL), bundleContext);
}
@Override
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/JCAProvider.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/JCAProvider.java
index d6a09d8217..d6f6c714e2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/JCAProvider.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/JCAProvider.java
@@ -21,12 +21,11 @@
package org.apache.qpid.server.security.auth.sasl;
import java.security.Provider;
-import java.security.Security;
import java.util.Map;
import javax.security.sasl.SaslServerFactory;
-public final class JCAProvider extends Provider
+public class JCAProvider extends Provider
{
public JCAProvider(String name, Map<String, Class<? extends SaslServerFactory>> providerMap)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/signal/SignalHandlerTask.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/signal/SignalHandlerTask.java
new file mode 100644
index 0000000000..4e3fae1dbd
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/signal/SignalHandlerTask.java
@@ -0,0 +1,89 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.signal;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+
+import org.apache.log4j.Logger;
+
+public abstract class SignalHandlerTask
+{
+ private static final Logger LOGGER = Logger.getLogger(SignalHandlerTask.class);
+
+ private static final String HANDLE_METHOD = "handle";
+ private static final String SUN_MISC_SIGNAL_CLASS = "sun.misc.Signal";
+ private static final String SUN_MISC_SIGNAL_HANDLER_CLASS = "sun.misc.SignalHandler";
+
+ public boolean register(final String signalName)
+ {
+ try
+ {
+ //try to load the signal handling classes
+ Class<?> signalClazz = Class.forName(SUN_MISC_SIGNAL_CLASS);
+ Class<?> handlerClazz = Class.forName(SUN_MISC_SIGNAL_HANDLER_CLASS);
+
+ //create an InvocationHandler that just executes the SignalHandlerTask
+ InvocationHandler invoker = new InvocationHandler()
+ {
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
+ {
+ handle();
+
+ return null;
+ }
+ };
+
+ //create a dynamic proxy implementing SignalHandler
+ Object handler = Proxy.newProxyInstance(handlerClazz.getClassLoader(), new Class[]{handlerClazz}, invoker);
+
+ //create the Signal to handle
+ Constructor<?> signalConstructor = signalClazz.getConstructor(String.class);
+ Object signal = signalConstructor.newInstance(signalName);
+
+ //invoke the Signal.handle(signal, handler) method
+ Method handleMethod = signalClazz.getMethod(HANDLE_METHOD, signalClazz, handlerClazz);
+ handleMethod.invoke(null, signal, handler);
+ }
+ catch (Exception e)
+ {
+ LOGGER.debug("Unable to register handler for Signal " + signalName + " due to exception: " + e, e);
+ return false;
+ }
+
+ return true;
+ }
+
+ public abstract void handle();
+
+ public static String getPlatformDescription()
+ {
+ String name = System.getProperty("os.name");
+ String osVer = System.getProperty("os.version");
+ String jvmVendor = System.getProperty("java.vm.vendor");
+ String jvmName = System.getProperty("java.vm.name");
+ String javaRuntimeVer = System.getProperty("java.runtime.version");
+
+ return "OS: " + name + " " + osVer + ", JVM:" + jvmVendor + " " + jvmName + " " + javaRuntimeVer;
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index 0f6480253c..0921decda4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -109,15 +109,15 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
private final MessageAcquireMode _acquireMode;
private MessageFlowMode _flowMode;
private final ServerSession _session;
- private AtomicBoolean _stopped = new AtomicBoolean(true);
+ private final AtomicBoolean _stopped = new AtomicBoolean(true);
private static final Struct[] EMPTY_STRUCT_ARRAY = new Struct[0];
private LogActor _logActor;
- private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
+ private final Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
private UUID _id;
private String _traceExclude;
private String _trace;
- private long _createTime = System.currentTimeMillis();
+ private final long _createTime = System.currentTimeMillis();
private final AtomicLong _deliveredCount = new AtomicLong(0);
private final Map<String, Object> _arguments;
@@ -711,13 +711,22 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
public void stop()
{
- if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
+ try
{
- _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+ getSendLock();
+
+ if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
+ {
+ _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+ }
+ _stopped.set(true);
+ FlowCreditManager_0_10 creditManager = getCreditManager();
+ creditManager.clearCredit();
+ }
+ finally
+ {
+ releaseSendLock();
}
- _stopped.set(true);
- FlowCreditManager_0_10 creditManager = getCreditManager();
- creditManager.clearCredit();
}
public void addCredit(MessageCreditUnit unit, long value)