diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins')
5 files changed, 810 insertions, 0 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java new file mode 100644 index 0000000000..82b576ea51 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java @@ -0,0 +1,454 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.configuration.plugins; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.ConversionException; +import org.apache.log4j.Logger; +import org.apache.qpid.server.configuration.ConfigurationManager; +import org.apache.qpid.server.registry.ApplicationRegistry; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; + +public abstract class ConfigurationPlugin +{ + protected static final Logger _logger = Logger.getLogger(ConfigurationPlugin.class); + + private Map<String, ConfigurationPlugin> + _pluginConfiguration = new HashMap<String, ConfigurationPlugin>(); + + protected Configuration _configuration; + + /** + * The Elements that this Plugin can process. + * + * For a Queues plugin that would be a list containing: + * <ul> + * <li>queue - the queue entries + * <li>the alerting values for defaults + * <li>exchange - the default exchange + * <li>durable - set the default durablity + * </ul> + */ + abstract public String[] getElementsProcessed(); + + /** Performs configuration validation. */ + public void validateConfiguration() throws ConfigurationException + { + // Override in sub-classes + } + + public Configuration getConfig() + { + return _configuration; + } + + public <C extends ConfigurationPlugin> C getConfiguration(String plugin) + { + return (C) _pluginConfiguration.get(plugin); + } + + /** + * Sets the configuration for this plugin + * + * @param path + * @param configuration the configuration for this plugin. + */ + public void setConfiguration(String path, Configuration configuration) throws ConfigurationException + { + _configuration = configuration; + + // Extract a list of elements for processing + Iterator<?> keys = configuration.getKeys(); + + Set<String> elements = new HashSet<String>(); + while (keys.hasNext()) + { + String key = (String) keys.next(); + + int elementNameIndex = key.indexOf("."); + + String element = key.trim(); + if (elementNameIndex != -1) + { + element = key.substring(0, elementNameIndex).trim(); + } + + // Trim any element properties + elementNameIndex = element.indexOf("["); + if (elementNameIndex > 0) + { + element = element.substring(0, elementNameIndex).trim(); + } + + elements.add(element); + } + + //Remove the items we already expect in the configuration + for (String tag : getElementsProcessed()) + { + + // Work round the issue with Commons configuration. + // With an XMLConfiguration the key will be [@property] + // but with a CompositeConfiguration it will be @property]. + // Hide this issue from our users so when/if we change the + // configuration they don't have to. + int bracketIndex = tag.indexOf("["); + if (bracketIndex != -1) + { + tag = tag.substring(bracketIndex + 1, tag.length()); + } + + elements.remove(tag); + } + + if (_logger.isInfoEnabled()) + { + if (!elements.isEmpty()) + { + _logger.info("Elements to lookup:" + path); + for (String tag : elements) + { + _logger.info("Tag:'" + tag + "'"); + } + } + } + + // Process the elements in the configuration + for (String element : elements) + { + ConfigurationManager configurationManager = ApplicationRegistry.getInstance().getConfigurationManager(); + Configuration handled = element.length() == 0 ? configuration : configuration.subset(element); + + String configurationElement = element; + if (path.length() > 0) + { + configurationElement = path + "." + configurationElement; + } + + List<ConfigurationPlugin> handlers = configurationManager.getConfigurationPlugins(configurationElement, handled); + + if(_logger.isDebugEnabled()) + { + _logger.debug("For '" + element + "' found handlers (" + handlers.size() + "):" + handlers); + } + + for (ConfigurationPlugin plugin : handlers) + { + _pluginConfiguration.put(plugin.getClass().getName(), plugin); + } + } + + validateConfiguration(); + } + + /** Helper method to print out list of keys in a {@link Configuration}. */ + public static final void showKeys(Configuration config) + { + if (config.isEmpty()) + { + _logger.info("Configuration is empty"); + } + else + { + Iterator<?> keys = config.getKeys(); + while (keys.hasNext()) + { + String key = (String) keys.next(); + _logger.info("Configuration key: " + key); + } + } + } + + protected boolean hasConfiguration() + { + return _configuration != null; + } + + /// Getters + + protected double getDoubleValue(String property) + { + return getDoubleValue(property, 0.0); + } + + protected double getDoubleValue(String property, double defaultValue) + { + return _configuration.getDouble(property, defaultValue); + } + + protected long getLongValue(String property) + { + return getLongValue(property, 0); + } + + protected long getLongValue(String property, long defaultValue) + { + return _configuration.getLong(property, defaultValue); + } + + protected int getIntValue(String property) + { + return getIntValue(property, 0); + } + + protected int getIntValue(String property, int defaultValue) + { + return _configuration.getInt(property, defaultValue); + } + + protected String getStringValue(String property) + { + return getStringValue(property, null); + } + + protected String getStringValue(String property, String defaultValue) + { + return _configuration.getString(property, defaultValue); + } + + protected boolean getBooleanValue(String property) + { + return getBooleanValue(property, false); + } + + protected boolean getBooleanValue(String property, boolean defaultValue) + { + return _configuration.getBoolean(property, defaultValue); + } + + protected List getListValue(String property) + { + return getListValue(property, Collections.EMPTY_LIST); + } + + protected List getListValue(String property, List defaultValue) + { + return _configuration.getList(property, defaultValue); + } + + /// Validation Helpers + + protected boolean contains(String property) + { + return _configuration.getProperty(property) != null; + } + + /** + * Provide mechanism to validate Configuration contains a Postiive Long Value + * + * @param property + * + * @throws ConfigurationException + */ + protected void validatePositiveLong(String property) throws ConfigurationException + { + try + { + if (!containsPositiveLong(property)) + { + throw new ConfigurationException(this.getClass().getSimpleName() + + ": '" + property + + "' must be a Positive Long value."); + } + } + catch (Exception e) + { + Throwable last = e; + + // Find the first cause + if (e instanceof ConversionException) + { + Throwable t = e.getCause(); + while (t != null) + { + last = t; + t = last.getCause(); + } + } + + throw new ConfigurationException(this.getClass().getSimpleName() + + ": unable to configure invalid " + + property + ":" + + _configuration.getString(property), + last); + } + } + + protected boolean containsLong(String property) + { + try + { + _configuration.getLong(property); + return true; + } + catch (NoSuchElementException e) + { + return false; + } + } + + protected boolean containsPositiveLong(String property) + { + try + { + long value = _configuration.getLong(property); + return value > 0; + } + catch (NoSuchElementException e) + { + return false; + } + + } + + protected boolean containsInt(String property) + { + try + { + _configuration.getInt(property); + return true; + } + catch (NoSuchElementException e) + { + return false; + } + } + + protected boolean containsBoolean(String property) + { + try + { + _configuration.getBoolean(property); + return true; + } + catch (NoSuchElementException e) + { + return false; + } + } + + /** + * Given another configuration merge the configuration into our own config + * + * The new values being merged in will take precedence over existing values. + * + * In the simplistic case this means something like: + * + * So if we have configuration set + * name = 'fooo' + * + * And the new configuration contains a name then that will be reset. + * name = 'new' + * + * However this plugin will simply contain other plugins so the merge will + * be called until we end up at a base plugin that understand how to merge + * items. i.e Alerting values. Where the provided configuration will take + * precedence. + * + * @param configuration the config to merge in to our own. + */ + public void addConfiguration(ConfigurationPlugin configuration) + { + // If given configuration is null then there is nothing to process. + if (configuration == null) + { + return; + } + + // Merge all the sub configuration items + for (Map.Entry<String, ConfigurationPlugin> newPlugins : configuration._pluginConfiguration.entrySet()) + { + String key = newPlugins.getKey(); + ConfigurationPlugin config = newPlugins.getValue(); + + if (_pluginConfiguration.containsKey(key)) + { + //Merge the configuration if we already have this type of config + _pluginConfiguration.get(key).mergeConfiguration(config); + } + else + { + //otherwise just add it to our config. + _pluginConfiguration.put(key, config); + } + } + + //Merge the configuration itself + String key = configuration.getClass().getName(); + if (_pluginConfiguration.containsKey(key)) + { + //Merge the configuration if we already have this type of config + _pluginConfiguration.get(key).mergeConfiguration(configuration); + } + else + { + //If we are adding a configuration of our own type then merge + if (configuration.getClass() == this.getClass()) + { + mergeConfiguration(configuration); + } + else + { + // just store this in case someone else needs it. + _pluginConfiguration.put(key, configuration); + } + + } + + } + + protected void mergeConfiguration(ConfigurationPlugin configuration) + { + _configuration = configuration.getConfig(); + } + + public String toString() + { + StringBuilder sb = new StringBuilder(); + + sb.append("\n").append(getClass().getSimpleName()); + sb.append("=[ (").append(formatToString()).append(")"); + + for(Map.Entry<String,ConfigurationPlugin> item : _pluginConfiguration.entrySet()) + { + sb.append("\n").append(item.getValue()); + } + + sb.append("]\n"); + + return sb.toString(); + } + + public String formatToString() + { + return super.toString(); + } + +} + + diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPluginFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPluginFactory.java new file mode 100644 index 0000000000..02560b296e --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPluginFactory.java @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.configuration.plugins; + +import java.util.List; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; + +public interface ConfigurationPluginFactory +{ + /** + * The Parent paths of the configuration that this plugin supports. + * + * For example, {@code queue} elements have a parent path of {@code virtualhosts.virtualhost}. + */ + public List<String> getParentPaths(); + + public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException; +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionConfiguration.java new file mode 100644 index 0000000000..7a2632d923 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionConfiguration.java @@ -0,0 +1,89 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.configuration.plugins; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.ConversionException; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; +import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class SlowConsumerDetectionConfiguration extends ConfigurationPlugin +{ + public static class SlowConsumerDetectionConfigurationFactory implements ConfigurationPluginFactory + { + public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException + { + SlowConsumerDetectionConfiguration slowConsumerConfig = new SlowConsumerDetectionConfiguration(); + slowConsumerConfig.setConfiguration(path, config); + return slowConsumerConfig; + } + + public List<String> getParentPaths() + { + return Arrays.asList("virtualhosts.virtualhost.slow-consumer-detection"); + } + } + + //Set Default time unit to seconds + TimeUnit _timeUnit = TimeUnit.SECONDS; + + public String[] getElementsProcessed() + { + return new String[]{"delay", + "timeunit"}; + } + + public long getDelay() + { + return getLongValue("delay", 10); + } + + public TimeUnit getTimeUnit() + { + return _timeUnit; + } + + @Override + public void validateConfiguration() throws ConfigurationException + { + validatePositiveLong("delay"); + + String timeUnit = getStringValue("timeunit"); + + if (timeUnit != null) + { + try + { + _timeUnit = TimeUnit.valueOf(timeUnit.toUpperCase()); + } + catch (IllegalArgumentException iae) + { + throw new ConfigurationException("Unable to configure Slow Consumer Detection invalid TimeUnit:" + timeUnit); + } + } + + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionPolicyConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionPolicyConfiguration.java new file mode 100644 index 0000000000..ca8dec851a --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionPolicyConfiguration.java @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.configuration.plugins; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; +import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; + +import java.util.Arrays; +import java.util.List; + +public class SlowConsumerDetectionPolicyConfiguration extends ConfigurationPlugin +{ + public static class SlowConsumerDetectionPolicyConfigurationFactory implements ConfigurationPluginFactory + { + public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException + { + SlowConsumerDetectionPolicyConfiguration slowConsumerConfig = new SlowConsumerDetectionPolicyConfiguration(); + slowConsumerConfig.setConfiguration(path, config); + return slowConsumerConfig; + } + + public List<String> getParentPaths() + { + return Arrays.asList( + "virtualhosts.virtualhost.queues.slow-consumer-detection.policy", + "virtualhosts.virtualhost.queues.queue.slow-consumer-detection.policy", + "virtualhosts.virtualhost.topics.slow-consumer-detection.policy", + "virtualhosts.virtualhost.topics.topic.slow-consumer-detection.policy"); + } + } + + public String[] getElementsProcessed() + { + return new String[]{"name"}; + } + + public String getPolicyName() + { + return getStringValue("name"); + } + + @Override + public void validateConfiguration() throws ConfigurationException + { + if (getPolicyName() == null) + { + throw new ConfigurationException("No Slow consumer policy defined."); + } + } + + @Override + public String formatToString() + { + return "Policy:"+getPolicyName(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionQueueConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionQueueConfiguration.java new file mode 100644 index 0000000000..0638ea362f --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionQueueConfiguration.java @@ -0,0 +1,153 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.configuration.plugins; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; +import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; +import org.apache.qpid.server.plugins.PluginManager; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin; +import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class SlowConsumerDetectionQueueConfiguration extends ConfigurationPlugin +{ + private SlowConsumerPolicyPlugin _policyPlugin; + + public static class SlowConsumerDetectionQueueConfigurationFactory implements ConfigurationPluginFactory + { + public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException + { + SlowConsumerDetectionQueueConfiguration slowConsumerConfig = new SlowConsumerDetectionQueueConfiguration(); + slowConsumerConfig.setConfiguration(path, config); + return slowConsumerConfig; + } + + public List<String> getParentPaths() + { + return Arrays.asList( + "virtualhosts.virtualhost.queues.slow-consumer-detection", + "virtualhosts.virtualhost.queues.queue.slow-consumer-detection", + "virtualhosts.virtualhost.topics.slow-consumer-detection", + "virtualhosts.virtualhost.topics.topic.slow-consumer-detection"); + } + } + + public String[] getElementsProcessed() + { + return new String[]{"messageAge", + "depth", + "messageCount"}; + } + + public long getMessageAge() + { + return getLongValue("messageAge"); + } + + public long getDepth() + { + return getLongValue("depth"); + } + + public long getMessageCount() + { + return getLongValue("messageCount"); + } + + public SlowConsumerPolicyPlugin getPolicy() + { + return _policyPlugin; + } + + @Override + public void validateConfiguration() throws ConfigurationException + { + if (!containsPositiveLong("messageAge") && + !containsPositiveLong("depth") && + !containsPositiveLong("messageCount")) + { + throw new ConfigurationException("At least one configuration property" + + "('messageAge','depth' or 'messageCount') must be specified."); + } + + SlowConsumerDetectionPolicyConfiguration policyConfig = getConfiguration(SlowConsumerDetectionPolicyConfiguration.class.getName()); + + PluginManager pluginManager = ApplicationRegistry.getInstance().getPluginManager(); + Map<String, SlowConsumerPolicyPluginFactory> factories = pluginManager.getSlowConsumerPlugins(); + + if (policyConfig == null) + { + throw new ConfigurationException("No Slow Consumer Policy specified. Known Policies:" + factories.keySet()); + } + + if (_logger.isDebugEnabled()) + { + Iterator<?> keys = policyConfig.getConfig().getKeys(); + + while (keys.hasNext()) + { + String key = (String) keys.next(); + + _logger.debug("Policy Keys:" + key); + } + + } + + SlowConsumerPolicyPluginFactory<SlowConsumerPolicyPlugin> pluginFactory = factories.get(policyConfig.getPolicyName().toLowerCase()); + + if (pluginFactory == null) + { + throw new ConfigurationException("Unknown Slow Consumer Policy specified:" + policyConfig.getPolicyName() + " Known Policies:" + factories.keySet()); + } + + _policyPlugin = pluginFactory.newInstance(policyConfig); + + // Debug the creation of this Config + _logger.debug(this); + } + + public String formatToString() + { + StringBuilder sb = new StringBuilder(); + if (getMessageAge() > 0) + { + sb.append("Age:").append(getMessageAge()).append(":"); + } + if (getDepth() > 0) + { + sb.append("Depth:").append(getDepth()).append(":"); + } + if (getMessageCount() > 0) + { + sb.append("Count:").append(getMessageCount()).append(":"); + } + + sb.append("Policy[").append(getPolicy()).append("]"); + return sb.toString(); + } +} |