diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java | 342 |
1 files changed, 342 insertions, 0 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java new file mode 100644 index 0000000000..4e8d64a136 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.plugins; + +import static org.apache.felix.framework.util.FelixConstants.*; +import static org.apache.felix.main.AutoProcessor.*; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.configuration.ConfigurationException; +import org.apache.felix.framework.Felix; +import org.apache.felix.framework.util.StringMap; +import org.apache.log4j.Logger; +import org.apache.qpid.common.Closeable; +import org.apache.qpid.server.configuration.TopicConfiguration; +import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionConfiguration.SlowConsumerDetectionConfigurationFactory; +import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionPolicyConfiguration.SlowConsumerDetectionPolicyConfigurationFactory; +import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionQueueConfiguration.SlowConsumerDetectionQueueConfigurationFactory; +import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; +import org.apache.qpid.server.exchange.ExchangeType; +import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.security.SecurityPluginFactory; +import org.apache.qpid.server.security.access.plugins.AllowAll; +import org.apache.qpid.server.security.access.plugins.DenyAll; +import org.apache.qpid.server.security.access.plugins.LegacyAccess; +import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory; +import org.apache.qpid.server.virtualhost.plugins.SlowConsumerDetection; +import org.apache.qpid.server.virtualhost.plugins.policies.TopicDeletePolicy; +import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory; +import org.osgi.framework.BundleActivator; +import org.osgi.framework.BundleException; +import org.osgi.framework.launch.Framework; +import org.osgi.util.tracker.ServiceTracker; + +/** + * Provides access to pluggable elements, such as exchanges + */ +@SuppressWarnings("unchecked") +public class PluginManager implements Closeable +{ + private static final Logger _logger = Logger.getLogger(PluginManager.class); + + private static final int FELIX_STOP_TIMEOUT = 30000; + private static final String QPID_VER_SUFFIX = "version=0.11,"; + + private Framework _felix; + + private ServiceTracker _exchangeTracker = null; + private ServiceTracker _securityTracker = null; + private ServiceTracker _configTracker = null; + private ServiceTracker _virtualHostTracker = null; + private ServiceTracker _policyTracker = null; + + private Activator _activator; + + private Map<String, SecurityPluginFactory> _securityPlugins = new HashMap<String, SecurityPluginFactory>(); + private Map<List<String>, ConfigurationPluginFactory> _configPlugins = new IdentityHashMap<List<String>, ConfigurationPluginFactory>(); + private Map<String, VirtualHostPluginFactory> _vhostPlugins = new HashMap<String, VirtualHostPluginFactory>(); + private Map<String, SlowConsumerPolicyPluginFactory> _policyPlugins = new HashMap<String, SlowConsumerPolicyPluginFactory>(); + + public PluginManager(String pluginPath, String cachePath) throws Exception + { + // Store all non-OSGi plugins + // A little gross that we have to add them here, but not all the plugins are OSGIfied + for (SecurityPluginFactory<?> pluginFactory : Arrays.asList( + AllowAll.FACTORY, DenyAll.FACTORY, LegacyAccess.FACTORY)) + { + _securityPlugins.put(pluginFactory.getPluginName(), pluginFactory); + } + for (ConfigurationPluginFactory configFactory : Arrays.asList( + TopicConfiguration.FACTORY, + SecurityManager.SecurityConfiguration.FACTORY, + AllowAll.AllowAllConfiguration.FACTORY, + DenyAll.DenyAllConfiguration.FACTORY, + LegacyAccess.LegacyAccessConfiguration.FACTORY, + new SlowConsumerDetectionConfigurationFactory(), + new SlowConsumerDetectionPolicyConfigurationFactory(), + new SlowConsumerDetectionQueueConfigurationFactory())) + { + _configPlugins.put(configFactory.getParentPaths(), configFactory); + } + for (SlowConsumerPolicyPluginFactory pluginFactory : Arrays.asList( + new TopicDeletePolicy.TopicDeletePolicyFactory())) + { + _policyPlugins.put(pluginFactory.getPluginName(), pluginFactory); + } + for (VirtualHostPluginFactory pluginFactory : Arrays.asList( + new SlowConsumerDetection.SlowConsumerFactory())) + { + _vhostPlugins.put(pluginFactory.getClass().getName(), pluginFactory); + } + + // Check the plugin directory path is set and exist + if (pluginPath == null) + { + return; + } + File pluginDir = new File(pluginPath); + if (!pluginDir.exists()) + { + return; + } + + // Setup OSGi configuration propery map + StringMap configMap = new StringMap(false); + + // Add the bundle provided service interface package and the core OSGi + // packages to be exported from the class path via the system bundle. + configMap.put(FRAMEWORK_SYSTEMPACKAGES, + "org.osgi.framework; version=1.3.0," + + "org.osgi.service.packageadmin; version=1.2.0," + + "org.osgi.service.startlevel; version=1.0.0," + + "org.osgi.service.url; version=1.0.0," + + "org.osgi.util.tracker; version=1.0.0," + + "org.apache.qpid.junit.extensions.util; " + QPID_VER_SUFFIX + + "org.apache.qpid; " + QPID_VER_SUFFIX + + "org.apache.qpid.common; " + QPID_VER_SUFFIX + + "org.apache.qpid.exchange; " + QPID_VER_SUFFIX + + "org.apache.qpid.framing; " + QPID_VER_SUFFIX + + "org.apache.qpid.management.common.mbeans.annotations; " + QPID_VER_SUFFIX + + "org.apache.qpid.protocol; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.binding; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.configuration; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.configuration.plugins; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.configuration.management; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.exchange; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.logging; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.logging.actors; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.logging.subjects; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.management; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.persistent; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.plugins; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.protocol; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.queue; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.registry; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.security; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.security.access; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.security.access.plugins; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.virtualhost; " + QPID_VER_SUFFIX + + "org.apache.qpid.server.virtualhost.plugins; " + QPID_VER_SUFFIX + + "org.apache.qpid.util; " + QPID_VER_SUFFIX + + "org.apache.commons.configuration; version=1.0.0," + + "org.apache.commons.lang; version=1.0.0," + + "org.apache.commons.lang.builder; version=1.0.0," + + "org.apache.commons.logging; version=1.0.0," + + "org.apache.log4j; version=1.2.12," + + "javax.management.openmbean; version=1.0.0," + + "javax.management; version=1.0.0" + ); + + // No automatic shutdown hook + configMap.put("felix.shutdown.hook", "false"); + + // Add system activator + List<BundleActivator> activators = new ArrayList<BundleActivator>(); + _activator = new Activator(); + activators.add(_activator); + configMap.put(SYSTEMBUNDLE_ACTIVATORS_PROP, activators); + + if (cachePath != null) + { + File cacheDir = new File(cachePath); + if (!cacheDir.exists() && cacheDir.canWrite()) + { + _logger.info("Creating plugin cache directory: " + cachePath); + cacheDir.mkdir(); + } + + // Set plugin cache directory and empty it + _logger.info("Cache bundles in directory " + cachePath); + configMap.put(FRAMEWORK_STORAGE, cachePath); + } + configMap.put(FRAMEWORK_STORAGE_CLEAN, FRAMEWORK_STORAGE_CLEAN_ONFIRSTINIT); + + // Set directory with plugins to auto-deploy + _logger.info("Auto deploying bundles from directory " + pluginPath); + configMap.put(AUTO_DEPLOY_DIR_PROPERY, pluginPath); + configMap.put(AUTO_DEPLOY_ACTION_PROPERY, AUTO_DEPLOY_INSTALL_VALUE + "," + AUTO_DEPLOY_START_VALUE); + + // Start plugin manager and trackers + _felix = new Felix(configMap); + try + { + _logger.info("Starting plugin manager..."); + _felix.init(); + process(configMap, _felix.getBundleContext()); + _felix.start(); + _logger.info("Started plugin manager"); + } + catch (BundleException e) + { + throw new ConfigurationException("Could not start plugin manager: " + e.getMessage(), e); + } + + // TODO save trackers in a map, keyed by class name + + _exchangeTracker = new ServiceTracker(_activator.getContext(), ExchangeType.class.getName(), null); + _exchangeTracker.open(); + + _securityTracker = new ServiceTracker(_activator.getContext(), SecurityPluginFactory.class.getName(), null); + _securityTracker.open(); + + _configTracker = new ServiceTracker(_activator.getContext(), ConfigurationPluginFactory.class.getName(), null); + _configTracker.open(); + + _virtualHostTracker = new ServiceTracker(_activator.getContext(), VirtualHostPluginFactory.class.getName(), null); + _virtualHostTracker.open(); + + _policyTracker = new ServiceTracker(_activator.getContext(), SlowConsumerPolicyPluginFactory.class.getName(), null); + _policyTracker.open(); + + _logger.info("Opened service trackers"); + } + + private static <T> Map<String, T> getServices(ServiceTracker tracker) + { + Map<String, T> services = new HashMap<String, T>(); + + if ((tracker != null) && (tracker.getServices() != null)) + { + for (Object service : tracker.getServices()) + { + if (service instanceof PluginFactory<?>) + { + services.put(((PluginFactory<?>) service).getPluginName(), (T) service); + } + else + { + services.put(service.getClass().getName(), (T) service); + } + } + } + + return services; + } + + public static <T> Map<String, T> getServices(ServiceTracker tracker, Map<String, T> plugins) + { + Map<String, T> services = getServices(tracker); + services.putAll(plugins); + return services; + } + + public Map<List<String>, ConfigurationPluginFactory> getConfigurationPlugins() + { + Map<List<String>, ConfigurationPluginFactory> services = new IdentityHashMap<List<String>, ConfigurationPluginFactory>(); + + if (_configTracker != null && _configTracker.getServices() != null) + { + for (Object service : _configTracker.getServices()) + { + ConfigurationPluginFactory factory = (ConfigurationPluginFactory) service; + services.put(factory.getParentPaths(), factory); + } + } + + services.putAll(_configPlugins); + + return services; + } + + public Map<String, VirtualHostPluginFactory> getVirtualHostPlugins() + { + return getServices(_virtualHostTracker, _vhostPlugins); + } + + public Map<String, SlowConsumerPolicyPluginFactory> getSlowConsumerPlugins() + { + return getServices(_policyTracker, _policyPlugins); + } + + public Map<String, ExchangeType<?>> getExchanges() + { + return getServices(_exchangeTracker); + } + + public Map<String, SecurityPluginFactory> getSecurityPlugins() + { + return getServices(_securityTracker, _securityPlugins); + } + + public void close() + { + if (_felix != null) + { + try + { + // Close all bundle trackers + _exchangeTracker.close(); + _securityTracker.close(); + _configTracker.close(); + _virtualHostTracker.close(); + _policyTracker.close(); + } + finally + { + _logger.info("Stopping plugin manager"); + try + { + // FIXME should be stopAndWait() but hangs VM, need upgrade in felix + _felix.stop(); + } + catch (BundleException e) + { + // Ignore + } + + try + { + _felix.waitForStop(FELIX_STOP_TIMEOUT); + } + catch (InterruptedException e) + { + // Ignore + } + _logger.info("Stopped plugin manager"); + } + } + } +} |