diff options
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/util')
24 files changed, 3827 insertions, 0 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java b/qpid/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java new file mode 100644 index 0000000000..09478d4157 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java @@ -0,0 +1,695 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.util; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.regex.*; + +/** + * CommandLineParser provides a utility for specifying the format of a command line and parsing command lines to ensure + * that they fit their specified format. A command line is made up of flags and options, both may be refered to as + * options. A flag is an option that does not take an argument (specifying it means it has the value 'true' and not + * specifying it means it has the value 'false'). Options must take arguments but they can be set up with defaults so + * that they take a default value when not set. Options may be mandatory in wich case it is an error not to specify + * them on the command line. Flags are never mandatory because they are implicitly set to false when not specified. + * + * <p/>Some example command lines are: + * + * <ul> + * <li>This one has two options that expect arguments: + * <pre> + * cruisecontrol -configfile cruisecontrol.xml -port 9000 + * </pre> + * <li>This has one no-arg flag and two 'free' arguments: + * <pre> + * zip -r project.zip project/* + * </pre> + * <li>This one concatenates multiple flags into a single block with only one '-': + * <pre> + * jar -tvf mytar.tar + * </pre> + * + * <p/>The parsing rules are: + * + * <ol> + * <li>Flags may be combined after a single '-' because they never take arguments. Normally such flags are single letter + * flags but this is only a convention and not enforced. Flags of more than one letter are usually specified on their own. + * <li>Options expecting arguments must always be on their own. + * <li>The argument to an option may be seperated from it by whitespace or appended directly onto the option. + * <li>The argument to an option may never begin with a '-' character. + * <li>All other arguments not beginning with a '-' character are free arguments that do not belong to any option. + * <li>The second or later of a set of duplicate or repeated flags are ignored. + * <li>Options are matched up to the shortest matching option. This is because of the possibility of having no space + * between an option and its argument. This rules out the possibility of using two options where one is an opening + * substring of the other. For example, the options "foo" and "foobar" cannot be used on the same command line because + * it is not possible to distinguish the argument "-foobar" from being the "foobar" option or the "foo" option with + * the "bar" argument. + * </ol> + * + * <p/>By default, unknown options are simply ignored if specified on the command line. This behaviour may be changed + * so that the parser reports all unknowns as errors by using the {@link #setErrorsOnUnknowns} method. + * + * <p><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Accept a command line specification. + * <tr><td> Parse a command line into properties, validating it against its specification. + * <tr><td> Report all errors between a command line and its specification. + * <tr><td> Provide a formatted usage string for a command line. + * <tr><td> Provide a formatted options in force string for a command line. + * <tr><td> Allow errors on unknowns behaviour to be turned on or off. + * </table> + */ +public class CommandLineParser +{ + /** Holds a mapping from command line option names to detailed information about those options. */ + private Map<String, CommandLineOption> optionMap = new HashMap<String, CommandLineOption>(); + + /** Holds a list of parsing errors. */ + private List<String> parsingErrors = new ArrayList<String>(); + + /** Holds the regular expression matcher to match command line options with. */ + private Matcher optionMatcher = null; + + /** Holds the parsed command line properties after parsing. */ + private Properties parsedProperties = null; + + /** Flag used to indicate that errors should be created for unknown options. False by default. */ + private boolean errorsOnUnknowns = false; + + /** + * Creates a command line options parser from a command line specification. This is passed to this constructor + * as an array of arrays of strings. Each array of strings specifies the command line for a single option. A static + * array may therefore easily be used to configure the command line parser in a single method call with an easily + * readable format. + * + * <p/>Each array of strings must be 2, 3, 4 or 5 elements long. If any of the last three elements are missing they + * are assumed to be null. The elements specify the following parameters: + * <ol> + * <li>The name of the option without the leading '-'. For example, "file". To specify the format of the 'free' + * arguments use the option names "1", "2", ... and so on. + * <li>The option comment. A line of text describing the usage of the option. For example, "The file to be processed." + * <li>The options argument. This is a very short description of the argument to the option, often a single word + * or a reminder as to the arguments format. When this element is null the option is a flag and does not + * accept any arguments. For example, "filename" or "(unix | windows)" or null. The actual text specified + * is only used to print in the usage message to remind the user of the usage of the option. + * <li>The mandatory flag. When set to "true" an option must always be specified. Any other value, including null, + * means that the option is mandatory. Flags are always mandatory (see class javadoc for explanation of why) so + * this is ignored for flags. + * <li>A regular expression describing the format that the argument must take. Ignored if null. + * </ol> + * <p/>An example call to this constructor is: + * + * <pre> + * CommandLineParser commandLine = new CommandLineParser( + * new String[][] {{"file", "The file to be processed. ", "filename", "true"}, + * {"dir", "Directory to store results in. Current dir used if not set.", "out dir"}, + * {"os", "Operating system EOL format to use.", "(windows | unix)", null, "windows\|unix"}, + * {"v", "Verbose mode. Prints information about the processing as it goes."}, + * {"1", "The processing command to run.", "command", "true", "add\|remove\|list"}}); + * </pre> + * + * @param config The configuration as an array of arrays of strings. + */ + public CommandLineParser(String[][] config) + { + // Loop through all the command line option specifications creating details for each in the options map. + for (int i = 0; i < config.length; i++) + { + String[] nextOptionSpec = config[i]; + + addOption(nextOptionSpec[0], nextOptionSpec[1], (nextOptionSpec.length > 2) ? nextOptionSpec[2] : null, + (nextOptionSpec.length > 3) ? ("true".equals(nextOptionSpec[3]) ? true : false) : false, + (nextOptionSpec.length > 4) ? nextOptionSpec[4] : null); + } + } + + /** + * Lists all the parsing errors from the most recent parsing in a string. + * + * @return All the parsing errors from the most recent parsing. + */ + public String getErrors() + { + // Return the empty string if there are no errors. + if (parsingErrors.isEmpty()) + { + return ""; + } + + // Concatenate all the parsing errors together. + StringBuilder result = new StringBuilder(); + + for (String s : parsingErrors) + { + result.append(s); + } + + return result.toString(); + } + + /** + * Lists the properties set from the most recent parsing or an empty string if no parsing has been done yet. + * + * @return The properties set from the most recent parsing or an empty string if no parsing has been done yet. + */ + public String getOptionsInForce() + { + // Check if there are no properties to report and return and empty string if so. + if (parsedProperties == null) + { + return ""; + } + + // List all the properties. + StringBuilder result = new StringBuilder("Options in force:\n"); + + for (Map.Entry<Object, Object> property : parsedProperties.entrySet()) + { + result.append(property.getKey()) + .append(" = ") + .append(property.getValue()) + .append('\n'); + } + + return result.toString(); + } + + /** + * Generates a usage string consisting of the name of each option and each options argument description and + * comment. + * + * @return A usage string for all the options. + */ + public String getUsage() + { + String result = "Options:\n"; + + // Print usage on each of the command line options. + for (CommandLineOption optionInfo : optionMap.values()) + { + result += + "-" + optionInfo.option + " " + ((optionInfo.argument != null) ? (optionInfo.argument + " ") : "") + + optionInfo.comment + "\n"; + } + + return result; + } + + /** + * Control the behaviour of the errors on unkowns reporting. When turned on this reports all unkowns options + * as errors. When turned off, all unknowns are simply ignored. + * + * @param errors The setting of the errors on unkown flag. True to turn it on. + */ + public void setErrorsOnUnknowns(boolean errors) + { + errorsOnUnknowns = errors; + } + + /** + * Parses a set of command line arguments into a set of properties, keyed by the argument flag. The free arguments + * are keyed by integers as strings starting at "1" and then "2", ... and so on. + * + * <p/>See the class level comment for a description of the parsing rules. + * + * @param args The command line arguments. + * + * @return The arguments as a set of properties. + * + * @throws IllegalArgumentException If the command line cannot be parsed against its specification. If this exception + * is thrown a call to {@link #getErrors} will provide a diagnostic of the command + * line errors. + */ + public Properties parseCommandLine(String[] args) throws IllegalArgumentException + { + Properties options = new Properties(); + + // Used to keep count of the current 'free' argument. + int free = 1; + + // Used to indicate that the most recently parsed option is expecting arguments. + boolean expectingArgs = false; + + // The option that is expecting arguments from the next element of the command line. + String optionExpectingArgs = null; + + // Used to indicate that the most recently parsed option is a duplicate and should be ignored. + boolean ignore = false; + + // Create the regular expression matcher for the command line options. + StringBuilder regexp = new StringBuilder("^("); + int optionsAdded = 0; + + for (Iterator<String> i = optionMap.keySet().iterator(); i.hasNext();) + { + String nextOption = i.next(); + + // Check that the option is not a free argument definition. + boolean notFree = false; + + try + { + Integer.parseInt(nextOption); + } + catch (NumberFormatException e) + { + notFree = true; + } + + // Add the option to the regular expression matcher if it is not a free argument definition. + if (notFree) + { + regexp.append(nextOption) + .append(i.hasNext() ? "|" : ""); + optionsAdded++; + } + } + + // There has to be more that one option in the regular expression or else the compiler complains that the close + // cannot be nullable if the '?' token is used to make the matched option string optional. + regexp.append(')') + .append(((optionsAdded > 0) ? "?" : "")) + .append("(.*)"); + Pattern pattern = Pattern.compile(regexp.toString()); + + // Loop through all the command line arguments. + for (int i = 0; i < args.length; i++) + { + // Check if the next command line argument begins with a '-' character and is therefore the start of + // an option. + if (args[i].startsWith("-")) + { + // Extract the value of the option without the leading '-'. + String arg = args[i].substring(1); + + // Match up to the longest matching option. + optionMatcher = pattern.matcher(arg); + optionMatcher.matches(); + + String matchedOption = optionMatcher.group(1); + + // Match any argument directly appended onto the longest matching option. + String matchedArg = optionMatcher.group(2); + + // Check that a known option was matched. + if ((matchedOption != null) && !"".equals(matchedOption)) + { + // Get the command line option information for the matched option. + CommandLineOption optionInfo = optionMap.get(matchedOption); + + // Check if this option is expecting arguments. + if (optionInfo.expectsArgs) + { + // The option is expecting arguments so swallow the next command line argument as an + // argument to this option. + expectingArgs = true; + optionExpectingArgs = matchedOption; + + // In the mean time set this options argument to the empty string in case no argument is ever + // supplied. + // options.put(matchedOption, ""); + } + + // Check if the option was matched on its own and is a flag in which case set that flag. + if ("".equals(matchedArg) && !optionInfo.expectsArgs) + { + options.put(matchedOption, "true"); + } + // The option was matched as a substring with its argument appended to it or is a flag that is + // condensed together with other flags. + else if (!"".equals(matchedArg)) + { + // Check if the option is a flag and therefore is allowed to be condensed together + // with other flags. + if (!optionInfo.expectsArgs) + { + // Set the first matched flag. + options.put(matchedOption, "true"); + + // Repeat the longest matching process on the remainder but ensure that the remainder + // consists only of flags as only flags may be condensed together in this fashion. + do + { + // Match the remainder against the options. + optionMatcher = pattern.matcher(matchedArg); + optionMatcher.matches(); + + matchedOption = optionMatcher.group(1); + matchedArg = optionMatcher.group(2); + + // Check that an option was matched. + if (matchedOption != null) + { + // Get the command line option information for the next matched option. + optionInfo = optionMap.get(matchedOption); + + // Ensure that the next option is a flag or raise an error if not. + if (optionInfo.expectsArgs == true) + { + parsingErrors.add("Option " + matchedOption + " cannot be combined with flags.\n"); + } + + options.put(matchedOption, "true"); + } + // The remainder could not be matched against a flag it is either an unknown flag + // or an illegal argument to a flag. + else + { + parsingErrors.add("Illegal argument to a flag in the option " + arg + "\n"); + + break; + } + } + // Continue until the remainder of the argument has all been matched with flags. + while (!"".equals(matchedArg)); + } + // The option is expecting an argument, so store the unmatched portion against it + // as its argument. + else + { + // Check the arguments format is correct against any specified format. + checkArgumentFormat(optionInfo, matchedArg); + + // Store the argument against its option (regardless of its format). + options.put(matchedOption, matchedArg); + + // The argument to this flag has already been supplied to it. Do not swallow the + // next command line argument as an argument to this flag. + expectingArgs = false; + } + } + } + else // No matching option was found. + { + // Add this to the list of parsing errors if errors on unkowns is being used. + if (errorsOnUnknowns) + { + parsingErrors.add("Option " + matchedOption + " is not a recognized option.\n"); + } + } + } + // The command line argument did not being with a '-' so it is an argument to the previous flag or it + // is a free argument. + else + { + // Check if a previous flag is expecting to swallow this next argument as its argument. + if (expectingArgs) + { + // Get the option info for the option waiting for arguments. + CommandLineOption optionInfo = optionMap.get(optionExpectingArgs); + + // Check the arguments format is correct against any specified format. + checkArgumentFormat(optionInfo, args[i]); + + // Store the argument against its option (regardless of its format). + options.put(optionExpectingArgs, args[i]); + + // Clear the expecting args flag now that the argument has been swallowed. + expectingArgs = false; + optionExpectingArgs = null; + } + // This command line option is not an argument to any option. Add it to the set of 'free' options. + else + { + // Get the option info for the free option, if there is any. + CommandLineOption optionInfo = optionMap.get(Integer.toString(free)); + + if (optionInfo != null) + { + // Check the arguments format is correct against any specified format. + checkArgumentFormat(optionInfo, args[i]); + } + + // Add to the list of free options. + options.put(Integer.toString(free), args[i]); + + // Move on to the next free argument. + free++; + } + } + } + + // Scan through all the specified options to check that all mandatory options have been set and that all flags + // that were not set are set to false in the set of properties. + for (CommandLineOption optionInfo : optionMap.values()) + { + // Check if this is a flag. + if (!optionInfo.expectsArgs) + { + // Check if the flag is not set in the properties and set it to false if so. + if (!options.containsKey(optionInfo.option)) + { + options.put(optionInfo.option, "false"); + } + } + // Check if this is a mandatory option and was not set. + else if (optionInfo.mandatory && !options.containsKey(optionInfo.option)) + { + // Create an error for the missing option. + parsingErrors.add("Option -" + optionInfo.option + " is mandatory but not was not specified.\n"); + } + } + + // Check if there were any errors. + if (!parsingErrors.isEmpty()) + { + // Throw an illegal argument exception to signify that there were parsing errors. + throw new IllegalArgumentException(); + } + + // Convert any name/value pairs in the free arguments into properties in the parsed options. + options = takeFreeArgsAsProperties(options, 1); + + parsedProperties = options; + + return options; + } + + /** + * If a command line has been parsed, calling this method sets all of its parsed options into the specified properties. + */ + public void addCommandLineToProperties(Properties properties) + { + if (parsedProperties != null) + { + for (Object propKey : parsedProperties.keySet()) + { + String name = (String) propKey; + String value = parsedProperties.getProperty(name); + + properties.setProperty(name, value); + } + } + } + + /** + * Resets this command line parser after it has been used to parse a command line. This method will only need + * to be called to use this parser a second time which is not likely seeing as a command line is usually only + * specified once. However, it is exposed as a public method for the rare case where this may be done. + * + * <p/>Cleans the internal state of this parser, removing all stored errors and information about the options in + * force. + */ + public void reset() + { + parsingErrors = new ArrayList<String>(); + parsedProperties = null; + } + + /** + * Adds the option to list of available command line options. + * + * @param option The option to add as an available command line option. + * @param comment A comment for the option. + * @param argument The text that appears after the option in the usage string. + * @param mandatory When true, indicates that this option is mandatory. + * @param formatRegexp The format that the argument must take, defined as a regular expression. + */ + protected void addOption(String option, String comment, String argument, boolean mandatory, String formatRegexp) + { + // Check if usage text has been set in which case this option is expecting arguments. + boolean expectsArgs = ((argument == null) || argument.equals("")) ? false : true; + + // Add the option to the map of command line options. + CommandLineOption opt = new CommandLineOption(option, expectsArgs, comment, argument, mandatory, formatRegexp); + optionMap.put(option, opt); + } + + /** + * Converts the free arguments into property declarations. After parsing the command line the free arguments + * are numbered from 1, such that the parsed properties contain values for the keys "1", "2", ... This method + * converts any free arguments declared using the 'name=value' syntax into properties with key 'name', value + * 'value'. + * + * <p/>For example the comand line: + * <pre> + * ... debug=true + * </pre> + * + * <p/>After parsing has properties: + * <pre>[[1, debug=true]]</pre> + * + * <p/>After applying this method the properties are: + * <pre>[[1, debug=true], [debug, true]]</pre> + * + * @param properties The parsed command line properties. + * @param from The free argument index to convert to properties from. + * + * @return The parsed command line properties, with free argument name value pairs too. + */ + private Properties takeFreeArgsAsProperties(Properties properties, int from) + { + for (int i = from; true; i++) + { + String nextFreeArg = properties.getProperty(Integer.toString(i)); + + // Terminate the loop once all free arguments have been consumed. + if (nextFreeArg == null) + { + break; + } + + // Split it on the =, strip any whitespace and set it as a system property. + String[] nameValuePair = nextFreeArg.split("="); + + if (nameValuePair.length == 2) + { + properties.setProperty(nameValuePair[0], nameValuePair[1]); + } + } + + return properties; + } + + /** + * Checks the format of an argument to an option against its specified regular expression format if one has + * been set. Any errors are added to the list of parsing errors. + * + * @param optionInfo The command line option information for the option which is havings its argument checked. + * @param matchedArg The string argument to the option. + */ + private void checkArgumentFormat(CommandLineOption optionInfo, String matchedArg) + { + // Check if this option enforces a format for its argument. + if (optionInfo.argumentFormatRegexp != null) + { + Pattern pattern = Pattern.compile(optionInfo.argumentFormatRegexp); + Matcher argumentMatcher = pattern.matcher(matchedArg); + + // Check if the argument does not meet its required format. + if (!argumentMatcher.matches()) + { + // Create an error for this badly formed argument. + parsingErrors.add("The argument to option -" + optionInfo.option + " does not meet its required format.\n"); + } + } + } + + /** + * Extracts all name=value pairs from the command line, sets them all as system properties and also returns + * a map of properties containing them. + * + * @param args The command line. + * @param commandLine The command line parser. + * @param properties The properties object to inject all parsed properties into (optional may be <tt>null</tt>). + * + * @return A set of properties containing all name=value pairs from the command line. + */ + public static Properties processCommandLine(String[] args, CommandLineParser commandLine, Properties properties) + { + // Capture the command line arguments or display errors and correct usage and then exit. + Properties options = null; + + try + { + options = commandLine.parseCommandLine(args); + + // Add all the trailing command line options (name=value pairs) to system properties. They may be picked up + // from there. + commandLine.addCommandLineToProperties(properties); + } + catch (IllegalArgumentException e) + { + System.out.println(commandLine.getErrors()); + System.out.println(commandLine.getUsage()); + System.exit(1); + } + + return options; + } + + /** + * Holds information about a command line options. This includes what its name is, whether or not it is a flag, + * whether or not it is mandatory, what its user comment is, what its argument reminder text is and what its + * regular expression format is. + * + * <p><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Hold details of a command line option. + * </table> + */ + protected static class CommandLineOption + { + /** Holds the text for the flag to match this argument with. */ + public String option = null; + + /** Holds a string describing how to use this command line argument. */ + public String argument = null; + + /** Flag that determines whether or not this command line argument can take arguments. */ + public boolean expectsArgs = false; + + /** Holds a short comment describing what this command line argument is for. */ + public String comment = null; + + /** Flag that determines whether or not this is an mandatory command line argument. */ + public boolean mandatory = false; + + /** A regular expression describing what format the argument to this option muist have. */ + public String argumentFormatRegexp = null; + + /** + * Create a command line option object that holds specific information about a command line option. + * + * @param option The text that matches the option. + * @param expectsArgs Whether or not the option expects arguments. It is a flag if this is false. + * @param comment A comment explaining how to use this option. + * @param argument A short reminder of the format of the argument to this option/ + * @param mandatory Set to true if this option is mandatory. + * @param formatRegexp The regular expression that the argument to this option must meet to be valid. + */ + public CommandLineOption(String option, boolean expectsArgs, String comment, String argument, boolean mandatory, + String formatRegexp) + { + this.option = option; + this.expectsArgs = expectsArgs; + this.comment = comment; + this.argument = argument; + this.mandatory = mandatory; + this.argumentFormatRegexp = formatRegexp; + } + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java b/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java new file mode 100644 index 0000000000..633cf4fe3a --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Iterator; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicInteger; + +public class ConcurrentLinkedMessageQueueAtomicSize<E> extends ConcurrentLinkedQueueAtomicSize<E> implements MessageQueue<E> +{ + private static final Logger _logger = LoggerFactory.getLogger(ConcurrentLinkedMessageQueueAtomicSize.class); + + protected Queue<E> _messageHead = new ConcurrentLinkedQueueAtomicSize<E>(); + + protected AtomicInteger _messageHeadSize = new AtomicInteger(0); + + @Override + public int size() + { + return super.size() + _messageHeadSize.get(); + } + + public int headSize() + { + return _messageHeadSize.get(); + } + + @Override + public E poll() + { + if (_messageHead.isEmpty()) + { + return super.poll(); + } + else + { + E e = _messageHead.poll(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Providing item(" + e + ")from message head"); + } + + if (e != null) + { + _messageHeadSize.decrementAndGet(); + } + + return e; + } + } + + @Override + public boolean remove(Object o) + { + + if (_messageHead.isEmpty()) + { + return super.remove(o); + } + else + { + if (_messageHead.remove(o)) + { + _messageHeadSize.decrementAndGet(); + + return true; + } + + return super.remove(o); + } + } + + @Override + public boolean removeAll(Collection<?> c) + { + if (_messageHead.isEmpty()) + { + return super.removeAll(c); + } + else + { + // fixme this is super.removeAll but iterator here doesn't work + // we need to be able to correctly decrement _messageHeadSize + // boolean modified = false; + // Iterator<?> e = iterator(); + // while (e.hasNext()) + // { + // if (c.contains(e.next())) + // { + // e.remove(); + // modified = true; + // _size.decrementAndGet(); + // } + // } + // return modified; + + throw new RuntimeException("Not implemented"); + } + } + + @Override + public boolean isEmpty() + { + return (_messageHead.isEmpty() && super.isEmpty()); + } + + @Override + public void clear() + { + super.clear(); + _messageHead.clear(); + } + + @Override + public boolean contains(Object o) + { + return _messageHead.contains(o) || super.contains(o); + } + + @Override + public boolean containsAll(Collection<?> o) + { + return _messageHead.containsAll(o) || super.containsAll(o); + } + + @Override + public E element() + { + if (_messageHead.isEmpty()) + { + return super.element(); + } + else + { + return _messageHead.element(); + } + } + + @Override + public E peek() + { + if (_messageHead.isEmpty()) + { + return super.peek(); + } + else + { + E o = _messageHead.peek(); + if (_logger.isDebugEnabled()) + { + _logger.debug("Peeking item (" + o + ") from message head"); + } + + return o; + } + + } + + @Override + public Iterator<E> iterator() + { + final Iterator<E> mainMessageIterator = super.iterator(); + + return new Iterator<E>() + { + final Iterator<E> _headIterator = _messageHead.iterator(); + final Iterator<E> _mainIterator = mainMessageIterator; + + Iterator<E> last; + + public boolean hasNext() + { + return _headIterator.hasNext() || _mainIterator.hasNext(); + } + + public E next() + { + if (_headIterator.hasNext()) + { + last = _headIterator; + + return _headIterator.next(); + } + else + { + last = _mainIterator; + + return _mainIterator.next(); + } + } + + public void remove() + { + last.remove(); + if(last == _mainIterator) + { + _size.decrementAndGet(); + } + else + { + _messageHeadSize.decrementAndGet(); + } + } + }; + } + + @Override + public boolean retainAll(Collection<?> c) + { + throw new RuntimeException("Not Implemented"); + } + + @Override + public Object[] toArray() + { + throw new RuntimeException("Not Implemented"); + } + + public boolean pushHead(E o) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Adding item(" + o + ") to head of queue"); + } + + if (_messageHead.offer(o)) + { + _messageHeadSize.incrementAndGet(); + + return true; + } + + return false; + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueAtomicSize.java b/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueAtomicSize.java new file mode 100644 index 0000000000..c4d7683a02 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueAtomicSize.java @@ -0,0 +1,70 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.util; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + +public class ConcurrentLinkedQueueAtomicSize<E> extends ConcurrentLinkedQueue<E> +{ + AtomicInteger _size = new AtomicInteger(0); + + public int size() + { + return _size.get(); + } + + public boolean offer(E o) + { + + if (super.offer(o)) + { + _size.incrementAndGet(); + return true; + } + + return false; + } + + public E poll() + { + E e = super.poll(); + + if (e != null) + { + _size.decrementAndGet(); + } + + return e; + } + + @Override + public boolean remove(Object o) + { + if (super.remove(o)) + { + _size.decrementAndGet(); + return true; + } + + return false; + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueNoSize.java b/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueNoSize.java new file mode 100644 index 0000000000..1f168345a1 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueNoSize.java @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.util; + +import java.util.concurrent.ConcurrentLinkedQueue; + +public class ConcurrentLinkedQueueNoSize<E> extends ConcurrentLinkedQueue<E> +{ + public int size() + { + if (isEmpty()) + { + return 0; + } + else + { + return 1; + } + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java b/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java new file mode 100644 index 0000000000..1a57af9bf7 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java @@ -0,0 +1,395 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.util; + +import java.io.BufferedInputStream; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.LinkedList; +import java.util.List; + +/** + * FileUtils provides some simple helper methods for working with files. It follows the convention of wrapping all + * checked exceptions as runtimes, so code using these methods is free of try-catch blocks but does not expect to + * recover from errors. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Read a text file as a string. + * <tr><td> Open a file or default resource as an input stream. + * </table> + */ +public class FileUtils +{ + /** + * Reads a text file as a string. + * + * @param filename The name of the file. + * + * @return The contents of the file. + */ + public static String readFileAsString(String filename) + { + BufferedInputStream is = null; + + try + { + try + { + is = new BufferedInputStream(new FileInputStream(filename)); + } + catch (FileNotFoundException e) + { + throw new RuntimeException(e); + } + + return readStreamAsString(is); + } + finally + { + if (is != null) + { + try + { + is.close(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + } + } + + /** + * Reads a text file as a string. + * + * @param file The file. + * + * @return The contents of the file. + */ + public static String readFileAsString(File file) + { + BufferedInputStream is = null; + + try + { + is = new BufferedInputStream(new FileInputStream(file)); + } + catch (FileNotFoundException e) + { + throw new RuntimeException(e); + } + + return readStreamAsString(is); + } + + /** + * Reads the contents of a reader, one line at a time until the end of stream is encountered, and returns all + * together as a string. + * + * @param is The reader. + * + * @return The contents of the reader. + */ + private static String readStreamAsString(BufferedInputStream is) + { + try + { + byte[] data = new byte[4096]; + + StringBuffer inBuffer = new StringBuffer(); + + String line; + int read; + + while ((read = is.read(data)) != -1) + { + String s = new String(data, 0, read); + inBuffer.append(s); + } + + return inBuffer.toString(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + /** + * Either opens the specified filename as an input stream, or uses the default resource loaded using the + * specified class loader, if opening the file fails or no file name is specified. + * + * @param filename The name of the file to open. + * @param defaultResource The name of the default resource on the classpath if the file cannot be opened. + * @param cl The classloader to load the default resource with. + * + * @return An input stream for the file or resource, or null if one could not be opened. + */ + public static InputStream openFileOrDefaultResource(String filename, String defaultResource, ClassLoader cl) + { + InputStream is = null; + + // Flag to indicate whether the default resource should be used. By default this is true, so that the default + // is used when opening the file fails. + boolean useDefault = true; + + // Try to open the file if one was specified. + if (filename != null) + { + try + { + is = new BufferedInputStream(new FileInputStream(new File(filename))); + + // Clear the default flag because the file was succesfully opened. + useDefault = false; + } + catch (FileNotFoundException e) + { + // Ignore this exception, the default will be used instead. + } + } + + // Load the default resource if a file was not specified, or if opening the file failed. + if (useDefault) + { + is = cl.getResourceAsStream(defaultResource); + } + + return is; + } + + /** + * Copies the specified source file to the specified destintaion file. If the destinationst file does not exist, + * it is created. + * + * @param src The source file name. + * @param dst The destination file name. + */ + public static void copy(File src, File dst) + { + try + { + copyCheckedEx(src, dst); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + /** + * Copies the specified source file to the specified destination file. If the destination file does not exist, + * it is created. + * + * @param src The source file name. + * @param dst The destination file name. + * @throws IOException + */ + public static void copyCheckedEx(File src, File dst) throws IOException + { + InputStream in = new FileInputStream(src); + try + { + if (!dst.exists()) + { + dst.createNewFile(); + } + + OutputStream out = new FileOutputStream(dst); + + try + { + // Transfer bytes from in to out + byte[] buf = new byte[1024]; + int len; + while ((len = in.read(buf)) > 0) + { + out.write(buf, 0, len); + } + } + finally + { + out.close(); + } + } + finally + { + in.close(); + } + } + + /* + * Deletes a given file + */ + public static boolean deleteFile(String filePath) + { + return delete(new File(filePath), false); + } + + /* + * Deletes a given empty directory + */ + public static boolean deleteDirectory(String directoryPath) + { + File directory = new File(directoryPath); + + if (directory.isDirectory()) + { + if (directory.listFiles().length == 0) + { + return delete(directory, true); + } + } + + return false; + } + + /** + * Delete a given file/directory, + * A directory will always require the recursive flag to be set. + * if a directory is specified and recursive set then delete the whole tree + * + * @param file the File object to start at + * @param recursive boolean to recurse if a directory is specified. + * + * @return <code>true</code> if and only if the file or directory is + * successfully deleted; <code>false</code> otherwise + */ + public static boolean delete(File file, boolean recursive) + { + boolean success = true; + + if (file.isDirectory()) + { + if (recursive) + { + File[] files = file.listFiles(); + + // This can occur if the file is deleted outside the JVM + if (files == null) + { + return false; + } + + for (int i = 0; i < files.length; i++) + { + success = delete(files[i], true) && success; + } + + return success && file.delete(); + } + + return false; + } + + return file.delete(); + } + + public static class UnableToCopyException extends Exception + { + UnableToCopyException(String msg) + { + super(msg); + } + } + + public static void copyRecursive(File source, File dst) throws FileNotFoundException, UnableToCopyException + { + + if (!source.exists()) + { + throw new FileNotFoundException("Unable to copy '" + source.toString() + "' as it does not exist."); + } + + if (dst.exists() && !dst.isDirectory()) + { + throw new IllegalArgumentException("Unable to copy '" + source.toString() + "' to '" + dst + "' a file with same name exists."); + } + + if (source.isFile()) + { + copy(source, dst); + } + + //else we have a source directory + if (!dst.isDirectory() && !dst.mkdirs()) + { + throw new UnableToCopyException("Unable to create destination directory"); + } + + for (File file : source.listFiles()) + { + if (file.isFile()) + { + copy(file, new File(dst.toString() + File.separator + file.getName())); + } + else + { + copyRecursive(file, new File(dst + File.separator + file.getName())); + } + } + + } + + /** + * Checks the specified file for instances of the search string. + * + * @param file the file to search + * @param search the search String + * + * @throws java.io.IOException + * @return the list of matching entries + */ + public static List<String> searchFile(File file, String search) + throws IOException + { + + List<String> results = new LinkedList<String>(); + + BufferedReader reader = new BufferedReader(new FileReader(file)); + try + { + while (reader.ready()) + { + String line = reader.readLine(); + if (line.contains(search)) + { + results.add(line); + } + } + } + finally + { + reader.close(); + } + + return results; + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/MessageQueue.java b/qpid/java/common/src/main/java/org/apache/qpid/util/MessageQueue.java new file mode 100644 index 0000000000..b5efaa61b6 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/MessageQueue.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.util; + +import java.util.Queue; + +/** + * Defines a queue that has a push operation to add an element to the head of the queue. + * + * @todo Seems like this may be pointless, the implementation uses this method to increment the message count + * then calls offer. Why not simply override offer and drop this interface? + */ +public interface MessageQueue<E> extends Queue<E> +{ + /** + * Inserts the specified element into this queue, if possible. When using queues that may impose insertion + * restrictions (for example capacity bounds), method offer is generally preferable to method Collection.add(E), + * which can fail to insert an element only by throwing an exception. + * + * @param o The element to insert. + * + * @return <tt>true</tt> if it was possible to add the element to this queue, else <tt>false</tt> + */ + boolean pushHead(E o); +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/NameUUIDGen.java b/qpid/java/common/src/main/java/org/apache/qpid/util/NameUUIDGen.java new file mode 100644 index 0000000000..e764c8536b --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/NameUUIDGen.java @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.util; + +import java.nio.ByteBuffer; +import java.util.UUID; + + +/** + * NameUUIDGen + * + */ + +public final class NameUUIDGen implements UUIDGen +{ + + private static final int WIDTH = 8; + + final private byte[] seed; + final private ByteBuffer seedBuf; + private long counter; + + public NameUUIDGen() + { + String namespace = UUID.randomUUID().toString(); + this.seed = new byte[namespace.length() + WIDTH]; + for (int i = WIDTH; i < seed.length; i++) + { + seed[i] = (byte) namespace.charAt(i - WIDTH); + } + this.seedBuf = ByteBuffer.wrap(seed); + this.counter = 0; + } + + public UUID generate() + { + seedBuf.putLong(0, counter++); + return UUID.nameUUIDFromBytes(seed); + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/NetMatcher.java b/qpid/java/common/src/main/java/org/apache/qpid/util/NetMatcher.java new file mode 100644 index 0000000000..4c653e6ca0 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/NetMatcher.java @@ -0,0 +1,264 @@ +/*********************************************************************** + * Copyright (c) 2000-2006 The Apache Software Foundation. * + * All rights reserved. * + * ------------------------------------------------------------------- * + * Licensed under the Apache License, Version 2.0 (the "License"); you * + * may not use this file except in compliance with the License. You * + * may obtain a copy of the License at: * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, software * + * distributed under the License is distributed on an "AS IS" BASIS, * + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * + * implied. See the License for the specific language governing * + * permissions and limitations under the License. * + ***********************************************************************/ + +package org.apache.qpid.util; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; + +public class NetMatcher +{ + private ArrayList networks; + + public void initInetNetworks(final Collection nets) + { + networks = new ArrayList(); + for (Iterator iter = nets.iterator(); iter.hasNext(); ) try + { + InetNetwork net = InetNetwork.getFromString((String) iter.next()); + if (!networks.contains(net)) networks.add(net); + } + catch (java.net.UnknownHostException uhe) + { + log("Cannot resolve address: " + uhe.getMessage()); + } + networks.trimToSize(); + } + + public void initInetNetworks(final String[] nets) + { + networks = new ArrayList(); + for (int i = 0; i < nets.length; i++) try + { + InetNetwork net = InetNetwork.getFromString(nets[i]); + if (!networks.contains(net)) networks.add(net); + } + catch (java.net.UnknownHostException uhe) + { + log("Cannot resolve address: " + uhe.getMessage()); + } + networks.trimToSize(); + } + + public boolean matchInetNetwork(final String hostIP) + { + InetAddress ip = null; + + try + { + ip = InetAddress.getByName(hostIP); + } + catch (java.net.UnknownHostException uhe) + { + log("Cannot resolve address for " + hostIP + ": " + uhe.getMessage()); + } + + boolean sameNet = false; + + if (ip != null) for (Iterator iter = networks.iterator(); (!sameNet) && iter.hasNext(); ) + { + InetNetwork network = (InetNetwork) iter.next(); + sameNet = network.contains(ip); + } + return sameNet; + } + + public boolean matchInetNetwork(final InetAddress ip) + { + boolean sameNet = false; + + for (Iterator iter = networks.iterator(); (!sameNet) && iter.hasNext(); ) + { + InetNetwork network = (InetNetwork) iter.next(); + sameNet = network.contains(ip); + } + return sameNet; + } + + public NetMatcher() + { + } + + public NetMatcher(final String[] nets) + { + initInetNetworks(nets); + } + + public NetMatcher(final Collection nets) + { + initInetNetworks(nets); + } + + public String toString() { + return networks.toString(); + } + + protected void log(String s) { } +} + +class InetNetwork +{ + /* + * Implements network masking, and is compatible with RFC 1518 and + * RFC 1519, which describe CIDR: Classless Inter-Domain Routing. + */ + + private InetAddress network; + private InetAddress netmask; + + public InetNetwork(InetAddress ip, InetAddress netmask) + { + network = maskIP(ip, netmask); + this.netmask = netmask; + } + + public boolean contains(final String name) throws java.net.UnknownHostException + { + return network.equals(maskIP(InetAddress.getByName(name), netmask)); + } + + public boolean contains(final InetAddress ip) + { + return network.equals(maskIP(ip, netmask)); + } + + public String toString() + { + return network.getHostAddress() + "/" + netmask.getHostAddress(); + } + + public int hashCode() + { + return maskIP(network, netmask).hashCode(); + } + + public boolean equals(Object obj) + { + return (obj != null) && (obj instanceof InetNetwork) && + ((((InetNetwork)obj).network.equals(network)) && (((InetNetwork)obj).netmask.equals(netmask))); + } + + public static InetNetwork getFromString(String netspec) throws java.net.UnknownHostException + { + if (netspec.endsWith("*")) netspec = normalizeFromAsterisk(netspec); + else + { + int iSlash = netspec.indexOf('/'); + if (iSlash == -1) netspec += "/255.255.255.255"; + else if (netspec.indexOf('.', iSlash) == -1) netspec = normalizeFromCIDR(netspec); + } + + return new InetNetwork(InetAddress.getByName(netspec.substring(0, netspec.indexOf('/'))), + InetAddress.getByName(netspec.substring(netspec.indexOf('/') + 1))); + } + + public static InetAddress maskIP(final byte[] ip, final byte[] mask) + { + try + { + return getByAddress(new byte[] + { + (byte) (mask[0] & ip[0]), + (byte) (mask[1] & ip[1]), + (byte) (mask[2] & ip[2]), + (byte) (mask[3] & ip[3]) + }); + } + catch(Exception _) {} + { + return null; + } + } + + public static InetAddress maskIP(final InetAddress ip, final InetAddress mask) + { + return maskIP(ip.getAddress(), mask.getAddress()); + } + + /* + * This converts from an uncommon "wildcard" CIDR format + * to "address + mask" format: + * + * * => 000.000.000.0/000.000.000.0 + * xxx.* => xxx.000.000.0/255.000.000.0 + * xxx.xxx.* => xxx.xxx.000.0/255.255.000.0 + * xxx.xxx.xxx.* => xxx.xxx.xxx.0/255.255.255.0 + */ + static private String normalizeFromAsterisk(final String netspec) + { + String[] masks = { "0.0.0.0/0.0.0.0", "0.0.0/255.0.0.0", "0.0/255.255.0.0", "0/255.255.255.0" }; + char[] srcb = netspec.toCharArray(); + int octets = 0; + for (int i = 1; i < netspec.length(); i++) { + if (srcb[i] == '.') octets++; + } + return (octets == 0) ? masks[0] : netspec.substring(0, netspec.length() -1 ).concat(masks[octets]); + } + + /* + * RFC 1518, 1519 - Classless Inter-Domain Routing (CIDR) + * This converts from "prefix + prefix-length" format to + * "address + mask" format, e.g. from xxx.xxx.xxx.xxx/yy + * to xxx.xxx.xxx.xxx/yyy.yyy.yyy.yyy. + */ + static private String normalizeFromCIDR(final String netspec) + { + final int bits = 32 - Integer.parseInt(netspec.substring(netspec.indexOf('/')+1)); + final int mask = (bits == 32) ? 0 : 0xFFFFFFFF - ((1 << bits)-1); + + return netspec.substring(0, netspec.indexOf('/') + 1) + + Integer.toString(mask >> 24 & 0xFF, 10) + "." + + Integer.toString(mask >> 16 & 0xFF, 10) + "." + + Integer.toString(mask >> 8 & 0xFF, 10) + "." + + Integer.toString(mask >> 0 & 0xFF, 10); + } + + private static java.lang.reflect.Method getByAddress = null; + + static { + try { + Class inetAddressClass = Class.forName("java.net.InetAddress"); + Class[] parameterTypes = { byte[].class }; + getByAddress = inetAddressClass.getMethod("getByAddress", parameterTypes); + } catch (Exception e) { + getByAddress = null; + } + } + + private static InetAddress getByAddress(byte[] ip) throws java.net.UnknownHostException + { + InetAddress addr = null; + if (getByAddress != null) try { + addr = (InetAddress) getByAddress.invoke(null, new Object[] { ip }); + } catch (IllegalAccessException e) { + } catch (java.lang.reflect.InvocationTargetException e) { + } + + if (addr == null) { + addr = InetAddress.getByName + ( + Integer.toString(ip[0] & 0xFF, 10) + "." + + Integer.toString(ip[1] & 0xFF, 10) + "." + + Integer.toString(ip[2] & 0xFF, 10) + "." + + Integer.toString(ip[3] & 0xFF, 10) + ); + } + return addr; + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/PrettyPrintingUtils.java b/qpid/java/common/src/main/java/org/apache/qpid/util/PrettyPrintingUtils.java new file mode 100644 index 0000000000..93266f2486 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/PrettyPrintingUtils.java @@ -0,0 +1,75 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.util; + +/** + * Contains pretty printing convenienve methods for producing formatted logging output, mostly for debugging purposes. + * + * <p><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * </table> + * + * @todo Drop this. There are already array pretty printing methods it java.utils.Arrays. + */ +public class PrettyPrintingUtils +{ + /** + * Pretty prints an array of ints as a string. + * + * @param array The array to pretty print. + * + * @return The pretty printed string. + */ + public static String printArray(int[] array) + { + StringBuilder result = new StringBuilder("["); + for (int i = 0; i < array.length; i++) + { + result.append(array[i]) + .append((i < (array.length - 1)) ? ", " : ""); + } + + result.append(']'); + + return result.toString(); + } + + /** + * Pretty prints an array of strings as a string. + * + * @param array The array to pretty print. + * + * @return The pretty printed string. + */ + public static String printArray(String[] array) + { + String result = "["; + for (int i = 0; i < array.length; i++) + { + result += array[i]; + result += (i < (array.length - 1)) ? ", " : ""; + } + + result += "]"; + + return result; + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/RandomUUIDGen.java b/qpid/java/common/src/main/java/org/apache/qpid/util/RandomUUIDGen.java new file mode 100644 index 0000000000..60b402a105 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/RandomUUIDGen.java @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.util; + +import java.util.UUID; + + +/** + * RandomUUIDGen + * + */ + +public final class RandomUUIDGen implements UUIDGen +{ + + public UUID generate() + { + return UUID.randomUUID(); + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/Serial.java b/qpid/java/common/src/main/java/org/apache/qpid/util/Serial.java new file mode 100644 index 0000000000..8ad9d00f54 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/Serial.java @@ -0,0 +1,108 @@ +package org.apache.qpid.util; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import java.util.Comparator; + +import org.apache.qpid.SerialException; + +/** + * This class provides basic serial number comparisons as defined in + * RFC 1982. + */ + +public class Serial +{ + + public static final Comparator<Integer> COMPARATOR = new Comparator<Integer>() + { + public int compare(Integer s1, Integer s2) + { + return Serial.compare(s1, s2); + } + }; + + /** + * Compares two numbers using serial arithmetic. + * + * @param s1 the first serial number + * @param s2 the second serial number + * + * @return a negative integer, zero, or a positive integer as the + * first argument is less than, equal to, or greater than the + * second + */ + public static final int compare(int s1, int s2) + { + return s1 - s2; + } + + public static final boolean lt(int s1, int s2) + { + return compare(s1, s2) < 0; + } + + public static final boolean le(int s1, int s2) + { + return compare(s1, s2) <= 0; + } + + public static final boolean gt(int s1, int s2) + { + return compare(s1, s2) > 0; + } + + public static final boolean ge(int s1, int s2) + { + return compare(s1, s2) >= 0; + } + + public static final boolean eq(int s1, int s2) + { + return s1 == s2; + } + + public static final int min(int s1, int s2) + { + if (lt(s1, s2)) + { + return s1; + } + else + { + return s2; + } + } + + public static final int max(int s1, int s2) + { + if (gt(s1, s2)) + { + return s1; + } + else + { + return s2; + } + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java b/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java new file mode 100644 index 0000000000..a6a8b8beb4 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java @@ -0,0 +1,260 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.util; + +import java.io.UnsupportedEncodingException; + +import java.util.Arrays; +import java.util.Map; +import java.util.Properties; +import java.util.Stack; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + + +/** + * Strings + * + */ + +public final class Strings +{ + + private static final byte[] EMPTY = new byte[0]; + + private static final ThreadLocal<char[]> charbuf = new ThreadLocal<char[]>() + { + public char[] initialValue() + { + return new char[4096]; + } + }; + + public static final byte[] toUTF8(String str) + { + if (str == null) + { + return EMPTY; + } + else + { + final int size = str.length(); + char[] chars = charbuf.get(); + if (size > chars.length) + { + chars = new char[Math.max(size, 2*chars.length)]; + charbuf.set(chars); + } + + str.getChars(0, size, chars, 0); + final byte[] bytes = new byte[size]; + for (int i = 0; i < size; i++) + { + if (chars[i] > 127) + { + try + { + return str.getBytes("UTF-8"); + } + catch (UnsupportedEncodingException e) + { + throw new RuntimeException(e); + } + } + + bytes[i] = (byte) chars[i]; + } + return bytes; + } + } + + public static final String fromUTF8(byte[] bytes) + { + try + { + return new String(bytes, "UTF-8"); + } + catch (UnsupportedEncodingException e) + { + throw new RuntimeException(e); + } + } + + private static final Pattern VAR = Pattern.compile("(?:\\$\\{([^\\}]*)\\})|(?:\\$(\\$))"); + + public static interface Resolver + { + String resolve(String variable); + } + + public static class MapResolver implements Resolver + { + + private final Map<String,String> map; + + public MapResolver(Map<String,String> map) + { + this.map = map; + } + + public String resolve(String variable) + { + return map.get(variable); + } + } + + public static class PropertiesResolver implements Resolver + { + + private final Properties properties; + + public PropertiesResolver(Properties properties) + { + this.properties = properties; + } + + public String resolve(String variable) + { + return properties.getProperty(variable); + } + } + + public static class ChainedResolver implements Resolver + { + private final Resolver primary; + private final Resolver secondary; + + public ChainedResolver(Resolver primary, Resolver secondary) + { + this.primary = primary; + this.secondary = secondary; + } + + public String resolve(String variable) + { + String result = primary.resolve(variable); + if (result == null) + { + result = secondary.resolve(variable); + } + return result; + } + } + + public static final Resolver SYSTEM_RESOLVER = new Resolver() + { + public String resolve(String variable) + { + String result = System.getProperty(variable); + if (result == null) + { + result = System.getenv(variable); + } + return result; + } + }; + + public static final String expand(String input) + { + return expand(input, SYSTEM_RESOLVER); + } + + public static final String expand(String input, Resolver resolver) + { + return expand(input, resolver, new Stack<String>()); + } + + private static final String expand(String input, Resolver resolver, Stack<String> stack) + { + Matcher m = VAR.matcher(input); + StringBuffer result = new StringBuffer(); + while (m.find()) + { + String var = m.group(1); + if (var == null) + { + String esc = m.group(2); + if ("$".equals(esc)) + { + m.appendReplacement(result, Matcher.quoteReplacement("$")); + } + else + { + throw new IllegalArgumentException(esc); + } + } + else + { + m.appendReplacement(result, Matcher.quoteReplacement(resolve(var, resolver, stack))); + } + } + m.appendTail(result); + return result.toString(); + } + + private static final String resolve(String var, Resolver resolver, Stack<String> stack) + { + if (stack.contains(var)) + { + throw new IllegalArgumentException + (String.format("recursively defined variable: %s stack=%s", var, + stack)); + } + + String result = resolver.resolve(var); + if (result == null) + { + throw new IllegalArgumentException("no such variable: " + var); + } + + stack.push(var); + try + { + return expand(result, resolver, stack); + } + finally + { + stack.pop(); + } + } + + public static final String join(String sep, Iterable items) + { + StringBuilder result = new StringBuilder(); + + for (Object o : items) + { + if (result.length() > 0) + { + result.append(sep); + } + result.append(o.toString()); + } + + return result.toString(); + } + + public static final String join(String sep, Object[] items) + { + return join(sep, Arrays.asList(items)); + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/UUIDGen.java b/qpid/java/common/src/main/java/org/apache/qpid/util/UUIDGen.java new file mode 100644 index 0000000000..3cfe5afdac --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/UUIDGen.java @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.util; + + +import java.util.UUID; + +/** + * UUIDGen + * + */ + +public interface UUIDGen +{ + + public UUID generate(); + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/UUIDs.java b/qpid/java/common/src/main/java/org/apache/qpid/util/UUIDs.java new file mode 100644 index 0000000000..4bf6b7f0a2 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/UUIDs.java @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.util; + + +/** + * UUIDs + * + */ + +public final class UUIDs +{ + + public static final UUIDGen newGenerator() + { + return newGenerator(System.getProperty("qpid.uuid.generator", + NameUUIDGen.class.getName())); + } + + public static UUIDGen newGenerator(String name) + { + try + { + Class cls = Class.forName(name); + return (UUIDGen) cls.newInstance(); + } + catch (InstantiationException e) + { + throw new RuntimeException(e); + } + catch (IllegalAccessException e) + { + throw new RuntimeException(e); + } + catch (ClassNotFoundException e) + { + throw new RuntimeException(e); + } + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/AlreadyUnblockedException.java b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/AlreadyUnblockedException.java new file mode 100644 index 0000000000..e0c0337898 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/AlreadyUnblockedException.java @@ -0,0 +1,34 @@ +package org.apache.qpid.util.concurrent; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +/** + * Used to signal that a data element and its producer cannot be requeued or sent an error message when using a + * {@link BatchSynchQueue} because the producer has already been unblocked by an unblocking take on the queue. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Signal that an unblocking take has already occurred. + * </table> + */ +public class AlreadyUnblockedException extends RuntimeException +{ } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueue.java b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueue.java new file mode 100644 index 0000000000..63d8f77edb --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueue.java @@ -0,0 +1,122 @@ +package org.apache.qpid.util.concurrent; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import java.util.Collection; +import java.util.concurrent.BlockingQueue; + +/** + * BatchSynchQueue is an abstraction of the classic producer/consumer buffer pattern for thread interaction. In this + * pattern threads can deposit data onto a buffer whilst other threads take data from the buffer and perform usefull + * work with it. A BatchSynchQueue adds to this the possibility that producers can be blocked until their data is + * consumed or until a consumer chooses to release the producer some time after consuming the data from the queue. + * + * <p>There are a number of possible advantages to using this technique when compared with having the producers + * processing their own data: + * + * <ul> + * <li>Data may be deposited asynchronously in the buffer allowing the producers to continue running.</li> + * <li>Data may be deposited synchronously in the buffer so that producers wait until their data has been processed + * before being allowed to continue.</li> + * <li>Variable rates of production/consumption can be smoothed over by the buffer as it provides space in memory to + * hold data between production and consumption.</li> + * <li>Consumers may be able to batch data as they consume it leading to more efficient consumption over + * individual data item consumption where latency associated with the consume operation can be ammortized. + * For example, it may be possibly to ammortize the cost of a disk seek over many producers.</li> + * <li>Data from seperate threads can be combined together in the buffer, providing a convenient way of spreading work + * amongst many workers and gathering the results together again.</li> + * <li>Different types of queue can be used to hold the buffer, resulting in different processing orders. For example, + * lifo, fifo, priority heap, etc.</li> + * </ul> + * + * <p/>The asynchronous type of producer/consumer buffers is already well supported by the java.util.concurrent package + * (in Java 5) and there is also a synchronous queue implementation available there too. This interface extends the + * blocking queue with some more methods for controlling a synchronous blocking queue. In particular it adds additional + * take methods that can be used to take data from a queue without releasing producers, so that consumers have an + * opportunity to confirm correct processing of the data before producers are released. It also adds a put method with + * exceptions so that consumers can signal exception cases back to producers where there are errors in the data. + * + * <p/>This type of queue is usefull in situations where consumers can obtain an efficiency gain by batching data + * from many threads but where synchronous handling of that data is neccessary because producers need to know that + * their data has been processed before they continue. For example, sending a bundle of messages together, or writing + * many records to disk at once, may result in improved performance but the originators of the messages or disk records + * need confirmation that their data has really been sent or saved to disk. + * + * <p/>The consumer can put an element back onto the queue or send an error message to the elements producer using the + * {@link SynchRecord} interface. + * + * <p/>The {@link #take()}, {@link #drainTo(java.util.Collection<? super E>)} and + * {@link #drainTo(java.util.Collection<? super E>, int)} methods from {@link BlockingQueue} should behave as if they + * have been called with unblock set to false. That is they take elements from the queue but leave the producers + * blocked. These methods do not return collections of {@link SynchRecord}s so they do not supply an interface through + * which errors or re-queuings can be applied. If these methods are used then the consumer must succesfully process + * all the records it takes. + * + * <p/>The {@link #put} method should silently swallow any exceptions that consumers attempt to return to the caller. + * In order to handle exceptions the {@link #tryPut} method must be used. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Handle synchronous puts, with possible exceptions. + * <tr><td> Allow consumers to take many records from a queue in a batch. + * <tr><td> Allow consumers to decide when to unblock synchronous producers. + * </table> + */ +public interface BatchSynchQueue<E> extends BlockingQueue<E> +{ + /** + * Tries a synchronous put into the queue. If a consumer encounters an exception condition whilst processing the + * data that is put, then this is returned to the caller wrapped inside a {@link SynchException}. + * + * @param e The data element to put into the queue. + * + * @throws InterruptedException If the thread is interrupted whilst waiting to write to the queue or whilst waiting + * on its entry in the queue being consumed. + * @throws SynchException If a consumer encounters an error whilst processing the data element. + */ + public void tryPut(E e) throws InterruptedException, SynchException; + + /** + * Takes all available data items from the queue or blocks until some become available. The returned items + * are wrapped in a {@link SynchRecord} which provides an interface to requeue them or send errors to their + * producers, where the producers are still blocked. + * + * @param c The collection to drain the data items into. + * @param unblock If set to <tt>true</tt> the producers for the taken items will be immediately unblocked. + * + * @return A count of the number of elements that were drained from the queue. + */ + public SynchRef drainTo(Collection<SynchRecord<E>> c, boolean unblock); + + /** + * Takes up to maxElements available data items from the queue or blocks until some become available. The returned + * items are wrapped in a {@link SynchRecord} which provides an interface to requeue them or send errors to their + * producers, where the producers are still blocked. + * + * @param c The collection to drain the data items into. + * @param maxElements The maximum number of elements to drain. + * @param unblock If set to <tt>true</tt> the producers for the taken items will be immediately unblocked. + * + * @return A count of the number of elements that were drained from the queue. + */ + public SynchRef drainTo(Collection<SynchRecord<E>> c, int maxElements, boolean unblock); +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java new file mode 100644 index 0000000000..4564b1d686 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java @@ -0,0 +1,834 @@ +package org.apache.qpid.util.concurrent; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Synchronous/Asynchronous puts. Asynchronous is easiest, just wait till can write to queue and deposit data. + * Synchronous is harder. Deposit data, but then must wait until deposited element/elements are taken before being + * allowed to unblock and continue. Consumer needs some options here too. Can just get the data from the buffer and + * allow any producers unblocked as a result to continue, or can get data but continue blocking while the data is + * processed before sending a message to do the unblocking. Synch/Asynch mode to be controlled by a switch. + * Unblocking/not unblocking during consumer processing to be controlled by the consumers calls. + * + * <p/>Implementing sub-classes only need to supply an implementation of a queue to produce a valid concrete + * implementation of this. This queue is only accessed through the methods {@link #insert}, {@link #extract}, + * {@link #getBufferCapacity()}, {@link #peekAtBufferHead()}. An implementation can override these methods to implement + * the buffer other than by a queue, for example, by using an array. + * + * <p/>Normal queue methods to work asynchronously. + * <p/>Put, take and drain methods from the BlockingQueue interface work synchronously but unblock producers immediately + * when their data is taken. + * <p/>The additional put, take and drain methods from the BatchSynchQueue interface work synchronously and provide the + * option to keep producers blocked until the consumer decides to release them. + * + * <p/>Removed take method that keeps producers blocked as it is pointless. Essentially it reduces this class to + * synchronous processing of individual data items, which negates the point of the hand-off design. The efficiency + * gain of the hand off design comes in being able to batch consume requests, ammortizing latency (such as caused by io) + * accross many producers. The only advantage of the single blocking take method is that it did take advantage of the + * queue ordering, which ma be usefull, for example to apply a priority ordering amongst producers. This is also an + * advantage over the java.util.concurrent.SynchronousQueue which doesn't have a backing queue which can be used to + * apply orderings. If a single item take is really needed can just use the drainTo method with a maximum of one item. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * </table> + */ +public abstract class BatchSynchQueueBase<E> extends AbstractQueue<E> implements BatchSynchQueue<E> +{ + /** Used for logging. */ + private static final Logger log = LoggerFactory.getLogger(BatchSynchQueueBase.class); + + /** Holds a reference to the queue implementation that holds the buffer. */ + Queue<SynchRecordImpl<E>> buffer; + + /** Holds the number of items in the queue */ + private int count; + + /** Main lock guarding all access */ + private ReentrantLock lock; + + /** Condition for waiting takes */ + private Condition notEmpty; + + /** Condition for waiting puts */ + private Condition notFull; + + /** + * Creates a batch synch queue without fair thread scheduling. + */ + public BatchSynchQueueBase() + { + this(false); + } + + /** + * Ensures that the underlying buffer implementation is created. + * + * @param fair <tt>true</tt> if fairness is to be applied to threads waiting to access the buffer. + */ + public BatchSynchQueueBase(boolean fair) + { + buffer = this.createQueue(); + + // Create the buffer lock with the fairness flag set accordingly. + lock = new ReentrantLock(fair); + + // Create the non-empty and non-full condition monitors on the buffer lock. + notEmpty = lock.newCondition(); + notFull = lock.newCondition(); + } + + /** + * Returns an iterator over the elements contained in this collection. + * + * @return An iterator over the elements contained in this collection. + */ + public Iterator<E> iterator() + { + throw new RuntimeException("Not implemented."); + } + + /** + * Returns the number of elements in this collection. If the collection contains more than + * <tt>Integer.MAX_VALUE</tt> elements, returns <tt>Integer.MAX_VALUE</tt>. + * + * @return The number of elements in this collection. + */ + public int size() + { + final ReentrantLock lock = this.lock; + lock.lock(); + + try + { + return count; + } + finally + { + lock.unlock(); + } + } + + /** + * Inserts the specified element into this queue, if possible. When using queues that may impose insertion + * restrictions (for example capacity bounds), method <tt>offer</tt> is generally preferable to method + * {@link java.util.Collection#add}, which can fail to insert an element only by throwing an exception. + * + * @param e The element to insert. + * + * @return <tt>true</tt> if it was possible to add the element to this queue, else <tt>false</tt> + */ + public boolean offer(E e) + { + if (e == null) + { + throw new NullPointerException(); + } + + final ReentrantLock lock = this.lock; + lock.lock(); + + try + { + return insert(e, false); + } + finally + { + lock.unlock(); + } + } + + /** + * Inserts the specified element into this queue, waiting if necessary up to the specified wait time for space to + * become available. + * + * @param e The element to add. + * @param timeout How long to wait before giving up, in units of <tt>unit</tt> + * @param unit A <tt>TimeUnit</tt> determining how to interpret the <tt>timeout</tt> parameter. + * + * @return <tt>true</tt> if successful, or <tt>false</tt> if the specified waiting time elapses before space is + * available. + * + * @throws InterruptedException If interrupted while waiting. + * @throws NullPointerException If the specified element is <tt>null</tt>. + */ + public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException + { + if (e == null) + { + throw new NullPointerException(); + } + + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + + long nanos = unit.toNanos(timeout); + + try + { + do + { + if (insert(e, false)) + { + return true; + } + + try + { + nanos = notFull.awaitNanos(nanos); + } + catch (InterruptedException ie) + { + notFull.signal(); // propagate to non-interrupted thread + throw ie; + } + } + while (nanos > 0); + + return false; + } + finally + { + lock.unlock(); + } + } + + /** + * Retrieves and removes the head of this queue, or <tt>null</tt> if this queue is empty. + * + * @return The head of this queue, or <tt>null</tt> if this queue is empty. + */ + public E poll() + { + final ReentrantLock lock = this.lock; + + lock.lock(); + try + { + if (count == 0) + { + return null; + } + + E x = extract(true, true).getElement(); + + return x; + } + finally + { + lock.unlock(); + } + } + + /** + * Retrieves and removes the head of this queue, waiting if necessary up to the specified wait time if no elements + * are present on this queue. + * + * @param timeout How long to wait before giving up, in units of <tt>unit</tt>. + * @param unit A <tt>TimeUnit</tt> determining how to interpret the <tt>timeout</tt> parameter. + * + * @return The head of this queue, or <tt>null</tt> if the specified waiting time elapses before an element is present. + * + * @throws InterruptedException If interrupted while waiting. + */ + public E poll(long timeout, TimeUnit unit) throws InterruptedException + { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try + { + long nanos = unit.toNanos(timeout); + + do + { + if (count != 0) + { + E x = extract(true, true).getElement(); + + return x; + } + + try + { + nanos = notEmpty.awaitNanos(nanos); + } + catch (InterruptedException ie) + { + notEmpty.signal(); // propagate to non-interrupted thread + throw ie; + } + } + while (nanos > 0); + + return null; + } + finally + { + lock.unlock(); + } + } + + /** + * Retrieves, but does not remove, the head of this queue, returning <tt>null</tt> if this queue is empty. + * + * @return The head of this queue, or <tt>null</tt> if this queue is empty. + */ + public E peek() + { + final ReentrantLock lock = this.lock; + lock.lock(); + + try + { + return peekAtBufferHead(); + } + finally + { + lock.unlock(); + } + } + + /** + * Returns the number of elements that this queue can ideally (in the absence of memory or resource constraints) + * accept without blocking, or <tt>Integer.MAX_VALUE</tt> if there is no intrinsic limit. + * + * <p>Note that you <em>cannot</em> always tell if an attempt to <tt>add</tt> an element will succeed by + * inspecting <tt>remainingCapacity</tt> because it may be the case that another thread is about to <tt>put</tt> + * or <tt>take</tt> an element. + * + * @return The remaining capacity. + */ + public int remainingCapacity() + { + final ReentrantLock lock = this.lock; + lock.lock(); + + try + { + return getBufferCapacity() - count; + } + finally + { + lock.unlock(); + } + } + + /** + * Adds the specified element to this queue, waiting if necessary for space to become available. + * + * <p/>This method delegated to {@link #tryPut} which can raise {@link SynchException}s. If any are raised + * this method silently ignores them. Use the {@link #tryPut} method directly if you want to catch these + * exceptions. + * + * @param e The element to add. + * + * @throws InterruptedException If interrupted while waiting. + */ + public void put(E e) throws InterruptedException + { + try + { + tryPut(e); + } + catch (SynchException ex) + { + // This exception is deliberately ignored. See the method comment for information about this. + } + } + + /** + * Tries a synchronous put into the queue. If a consumer encounters an exception condition whilst processing the + * data that is put, then this is returned to the caller wrapped inside a {@link SynchException}. + * + * @param e The data element to put into the queue. Cannot be null. + * + * @throws InterruptedException If the thread is interrupted whilst waiting to write to the queue or whilst waiting + * on its entry in the queue being consumed. + * @throws SynchException If a consumer encounters an error whilst processing the data element. + */ + public void tryPut(E e) throws InterruptedException, SynchException + { + if (e == null) + { + throw new NullPointerException(); + } + + // final Queue<E> items = this.buffer; + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + + try + { + while (count == getBufferCapacity()) + { + // Release the lock and wait until the queue is not full. + notFull.await(); + } + } + catch (InterruptedException ie) + { + notFull.signal(); // propagate to non-interrupted thread + throw ie; + } + + // There is room in the queue so insert must succeed. Insert into the queu, release the lock and block + // the producer until its data is taken. + insert(e, true); + } + + /** + * Retrieves and removes the head of this queue, waiting if no elements are present on this queue. + * Any producer that has its data element taken by this call will be immediately unblocked. To keep the + * producer blocked whilst taking just a single item, use the + * {@link #drainTo(java.util.Collection<org.apache.qpid.util.concurrent.SynchRecord<E>>, int, boolean)} + * method. There is no take method to do that because there is not usually any advantage in a synchronous hand + * off design that consumes data one item at a time. It is normal to consume data in chunks to ammortize consumption + * latencies accross many producers where possible. + * + * @return The head of this queue. + * + * @throws InterruptedException if interrupted while waiting. + */ + public E take() throws InterruptedException + { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + + try + { + try + { + while (count == 0) + { + // Release the lock and wait until the queue becomes non-empty. + notEmpty.await(); + } + } + catch (InterruptedException ie) + { + notEmpty.signal(); // propagate to non-interrupted thread + throw ie; + } + + // There is data in the queue so extraction must succeed. Notify any waiting threads that the queue is + // not full, and unblock the producer that owns the data item that is taken. + E x = extract(true, true).getElement(); + + return x; + } + finally + { + lock.unlock(); + } + } + + /** + * Removes all available elements from this queue and adds them into the given collection. This operation may be + * more efficient than repeatedly polling this queue. A failure encountered while attempting to <tt>add</tt> elements + * to collection <tt>c</tt> may result in elements being in neither, either or both collections when the associated + * exception is thrown. Attempts to drain a queue to itself result in <tt>IllegalArgumentException</tt>. Further, + * the behavior of this operation is undefined if the specified collection is modified while the operation is in + * progress. + * + * @param objects The collection to transfer elements into. + * + * @return The number of elements transferred. + * + * @throws NullPointerException If objects is null. + * @throws IllegalArgumentException If objects is this queue. + */ + public int drainTo(Collection<? super E> objects) + { + return drainTo(objects, -1); + } + + /** + * Removes at most the given number of available elements from this queue and adds them into the given collection. + * A failure encountered while attempting to <tt>add</tt> elements to collection <tt>c</tt> may result in elements + * being in neither, either or both collections when the associated exception is thrown. Attempts to drain a queue + * to itself result in <tt>IllegalArgumentException</tt>. Further, the behavior of this operation is undefined if + * the specified collection is modified while the operation is in progress. + * + * @param objects The collection to transfer elements into. + * @param maxElements The maximum number of elements to transfer. If this is -1 then that is interpreted as meaning + * all elements. + * + * @return The number of elements transferred. + * + * @throws NullPointerException If c is null. + * @throws IllegalArgumentException If c is this queue. + */ + public int drainTo(Collection<? super E> objects, int maxElements) + { + if (objects == null) + { + throw new NullPointerException(); + } + + if (objects == this) + { + throw new IllegalArgumentException(); + } + + // final Queue<E> items = this.buffer; + final ReentrantLock lock = this.lock; + lock.lock(); + + try + { + int n = 0; + + for (int max = ((maxElements >= count) || (maxElements < 0)) ? count : maxElements; n < max; n++) + { + // Take items from the queue, do unblock the producers, but don't send not full signals yet. + objects.add(extract(true, false).getElement()); + } + + if (n > 0) + { + // count -= n; + notFull.signalAll(); + } + + return n; + } + finally + { + lock.unlock(); + } + } + + /** + * Takes all available data items from the queue or blocks until some become available. The returned items + * are wrapped in a {@link SynchRecord} which provides an interface to requeue them or send errors to their + * producers, where the producers are still blocked. + * + * @param c The collection to drain the data items into. + * @param unblock If set to <tt>true</tt> the producers for the taken items will be immediately unblocked. + * + * @return A count of the number of elements that were drained from the queue. + */ + public SynchRef drainTo(Collection<SynchRecord<E>> c, boolean unblock) + { + return drainTo(c, -1, unblock); + } + + /** + * Takes up to maxElements available data items from the queue or blocks until some become available. The returned + * items are wrapped in a {@link SynchRecord} which provides an interface to requeue them or send errors to their + * producers, where the producers are still blocked. + * + * @param coll The collection to drain the data items into. + * @param maxElements The maximum number of elements to drain. + * @param unblock If set to <tt>true</tt> the producers for the taken items will be immediately unblocked. + * + * @return A count of the number of elements that were drained from the queue. + */ + public SynchRef drainTo(Collection<SynchRecord<E>> coll, int maxElements, boolean unblock) + { + if (coll == null) + { + throw new NullPointerException(); + } + + // final Queue<E> items = this.buffer; + final ReentrantLock lock = this.lock; + lock.lock(); + + try + { + int n = 0; + + for (int max = ((maxElements >= count) || (maxElements < 0)) ? count : maxElements; n < max; n++) + { + // Extract the next record from the queue, don't signall the not full condition yet and release + // producers depending on whether the caller wants to or not. + coll.add(extract(false, unblock)); + } + + if (n > 0) + { + // count -= n; + notFull.signalAll(); + } + + return new SynchRefImpl(n, coll); + } + finally + { + lock.unlock(); + } + } + + /** + * This abstract method should be overriden to return an empty queue. Different implementations of producer + * consumer buffers can control the order in which data is accessed using different queue implementations. + * This method allows the type of queue to be abstracted out of this class and to be supplied by concrete + * implementations. + * + * @return An empty queue. + */ + protected abstract <T> Queue<T> createQueue(); + + /** + * Insert element into the queue, then possibly signal that the queue is not empty and block the producer + * on the element until permission to procede is given. + * + * <p/>If the producer is to be blocked then the lock must be released first, otherwise no other process + * will be able to get access to the queue. Hence, unlock and block are always set together. + * + * <p/>Call only when holding the global lock. + * + * @param unlockAndBlock <tt>true</tt>If the global queue lock should be released and the producer should be blocked. + * + * @return <tt>true</tt> if the operation succeeded, <tt>false</tt> otherwise. If the result is <tt>true</tt> this + * method may not return straight away, but only after the producer is unblocked by having its data + * consumed if the unlockAndBlock flag is set. In the false case the method will return straight away, no + * matter what value the unlockAndBlock flag has, leaving the global lock on. + */ + protected boolean insert(E x, boolean unlockAndBlock) + { + // Create a new record for the data item. + SynchRecordImpl<E> record = new SynchRecordImpl<E>(x); + + boolean result = buffer.offer(record); + + if (result) + { + count++; + + // Tell any waiting consumers that the queue is not empty. + notEmpty.signal(); + + if (unlockAndBlock) + { + // Allow other threads to read/write the queue. + lock.unlock(); + + // Wait until a consumer takes this data item. + record.waitForConsumer(); + } + + return true; + } + else + { + return false; + } + } + + /** + * Extract element at current take position, advance, and signal. + * + * <p/>Call only when holding lock. + */ + protected SynchRecordImpl<E> extract(boolean unblock, boolean signal) + { + SynchRecordImpl<E> result = buffer.remove(); + count--; + + if (signal) + { + notFull.signal(); + } + + if (unblock) + { + result.releaseImmediately(); + } + + return result; + } + + /** + * Get the capacity of the buffer. If the buffer has no maximum capacity then Integer.MAX_VALUE is returned. + * + * <p/>Call only when holding lock. + * + * @return The maximum capacity of the buffer. + */ + protected int getBufferCapacity() + { + if (buffer instanceof Capacity) + { + return ((Capacity) buffer).getCapacity(); + } + else + { + return Integer.MAX_VALUE; + } + } + + /** + * Return the head element from the buffer. + * + * <p/>Call only when holding lock. + * + * @return The head element from the buffer. + */ + protected E peekAtBufferHead() + { + return buffer.peek().getElement(); + } + + public class SynchRefImpl implements SynchRef + { + /** Holds the number of synch records associated with this reference. */ + int numRecords; + + /** Holds a reference to the collection of synch records managed by this. */ + Collection<SynchRecord<E>> records; + + public SynchRefImpl(int n, Collection<SynchRecord<E>> records) + { + this.numRecords = n; + this.records = records; + } + + public int getNumRecords() + { + return numRecords; + } + + /** + * Any producers that have had their data elements taken from the queue but have not been unblocked are unblocked + * when this method is called. The exception to this is producers that have had their data put back onto the queue + * by a consumer. Producers that have had exceptions for their data items registered by consumers will be unblocked + * but will not return from their put call normally, but with an exception instead. + */ + public void unblockProducers() + { + log.debug("public void unblockProducers(): called"); + + if (records != null) + { + for (SynchRecord<E> record : records) + { + // This call takes account of items that have already been released, are to be requeued or are in + // error. + record.releaseImmediately(); + } + } + + records = null; + } + } + + /** + * A SynchRecordImpl is used by a {@link BatchSynchQueue} to pair together a producer with its data. This allows + * the producer of data to be identified so that it can be unblocked when its data is consumed or sent errors when + * its data cannot be consumed. + */ + public class SynchRecordImpl<E> implements SynchRecord<E> + { + /** A boolean latch that determines when the producer for this data item will be allowed to continue. */ + BooleanLatch latch = new BooleanLatch(); + + /** The data element associated with this item. */ + E element; + + /** + * Create a new synch record. + * + * @param e The data element that the record encapsulates. + */ + public SynchRecordImpl(E e) + { + // Keep the data element. + element = e; + } + + /** + * Waits until the producer is given permission to proceded by a consumer. + */ + public void waitForConsumer() + { + latch.await(); + } + + /** + * Gets the data element contained by this record. + * + * @return The data element contained by this record. + */ + public E getElement() + { + return element; + } + + /** + * Immediately releases the producer of this data record. Consumers can bring the synchronization time of + * producers to a minimum by using this method to release them at the earliest possible moment when batch + * consuming records from sychronized producers. + */ + public void releaseImmediately() + { + // Check that the record has not already been released, is in error or is to be requeued. + latch.signal(); + + // Propagate errors to the producer. + + // Requeue items to be requeued. + } + + /** + * Tells the synch queue to put this element back onto the queue instead of releasing its producer. + * The element is not requeued immediately but upon calling the {@link SynchRef#unblockProducers()} method or + * the {@link #releaseImmediately()} method. + * + * <p/>This method will raise a runtime exception {@link AlreadyUnblockedException} if the producer for this + * element has already been unblocked. + */ + public void reQueue() + { + throw new RuntimeException("Not implemented."); + } + + /** + * Tells the synch queue to raise an exception with this elements producer. The exception is not raised + * immediately but upon calling the {@link SynchRef#unblockProducers()} method or the + * {@link #releaseImmediately()} method. The exception will be wrapped in a {@link SynchException} before it is + * raised on the producer. + * + * <p/>This method is unusual in that it accepts an exception as an argument. This is non-standard but is used + * because the exception is to be passed onto a different thread. + * + * <p/>This method will raise a runtime exception {@link AlreadyUnblockedException} if the producer for this + * element has already been unblocked. + * + * @param e The exception to raise on the producer. + */ + public void inError(Exception e) + { + throw new RuntimeException("Not implemented."); + } + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BooleanLatch.java b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BooleanLatch.java new file mode 100644 index 0000000000..0e4a07594f --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BooleanLatch.java @@ -0,0 +1,128 @@ +package org.apache.qpid.util.concurrent; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import java.util.concurrent.locks.AbstractQueuedSynchronizer; + +/** + * A BooleanLatch is like a set of traffic lights, where threads can wait at a red light until another thread gives + * the green light. When threads arrive at the latch it is initially red. They queue up until the green signal is + * given, at which point they can all acquire the latch in shared mode and continue to run concurrently. Once the latch + * is signalled it cannot be reset to red again. + * + * <p/> The latch uses a {@link java.util.concurrent.locks.AbstractQueuedSynchronizer} to implement its synchronization. + * This has two internal states, 0 which means that the latch is blocked, and 1 which means that the latch is open. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Block threads until a go signal is given. + * </table> + * + * @todo Might be better to use a countdown latch to count down from 1. Its await method can throw interrupted + * exception which makes the possibility of interruption more explicit, and provides a reminder to recheck the + * latch condition before continuing. + */ +public class BooleanLatch +{ + /** Holds the synchronizer that provides the thread queueing synchronization. */ + private final Sync sync = new Sync(); + + /** + * Tests whether or not the latch has been signalled, that is to say that, the light is green. + * + * <p/>This method is non-blocking. + * + * @return <tt>true</tt> if the latch may be acquired; the light is green. + */ + public boolean isSignalled() + { + return sync.isSignalled(); + } + + /** + * Waits on the latch until the signal is given and the light is green. If the light is already green then the + * latch will be acquired and the thread will not have to wait. + * + * <p/>This method will block until the go signal is given or the thread is otherwise interrupted. Before carrying + * out any processing threads that return from this method should confirm that the go signal has really been given + * on this latch by calling the {@link #isSignalled()} method. + */ + public void await() + { + sync.acquireShared(1); + } + + /** + * Releases any threads currently waiting on the latch. This flips the light to green allowing any threads that + * were waiting for this condition to now run. + * + * <p/>This method is non-blocking. + */ + public void signal() + { + sync.releaseShared(1); + } + + /** + * Implements a thread queued synchronizer. The internal state 0 means that the queue is blocked and the internl + * state 1 means that the queue is released and that all waiting threads can acquire the synchronizer in shared + * mode. + */ + private static class Sync extends AbstractQueuedSynchronizer + { + /** + * Attempts to acquire this synchronizer in shared mode. It may be acquired once it has been released. + * + * @param ignore This parameter is ignored. + * + * @return 1 if the shared acquisition succeeds and -1 if it fails. + */ + protected int tryAcquireShared(int ignore) + { + return isSignalled() ? 1 : -1; + } + + /** + * Releases the synchronizer, setting its internal state to 1. + * + * @param ignore This parameter is ignored. + * + * @return <tt>true</tt> always. + */ + protected boolean tryReleaseShared(int ignore) + { + setState(1); + + return true; + } + + /** + * Tests if the synchronizer is signalled. It is signalled when its internal state it 1. + * + * @return <tt>true</tt> if the internal state is 1, <tt>false</tt> otherwise. + */ + boolean isSignalled() + { + return getState() != 0; + } + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/Capacity.java b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/Capacity.java new file mode 100644 index 0000000000..a97ce0e172 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/Capacity.java @@ -0,0 +1,35 @@ +package org.apache.qpid.util.concurrent; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +/** + * An interface exposed by data structures that have a maximum capacity. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Report the maximum capacity. + * </table> + */ +public interface Capacity +{ + public int getCapacity(); +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchBuffer.java b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchBuffer.java new file mode 100644 index 0000000000..bc63eb0353 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchBuffer.java @@ -0,0 +1,50 @@ +package org.apache.qpid.util.concurrent; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import java.util.Queue; + +/** + * SynchBuffer completes the {@link BatchSynchQueueBase} abstract class by providing an implementation of the underlying + * queue as an array. This uses FIFO ordering for the queue but restricts the maximum size of the queue to a fixed + * amount. It also has the advantage that, as the buffer does not grow and shrink dynamically, memory for the buffer + * is allocated up front and does not create garbage during the operation of the queue. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Provide array based FIFO queue to create a batch synched queue around. + * </table> + * + * @todo Write an array based buffer implementation that implements Queue. + */ +public class SynchBuffer<E> extends BatchSynchQueueBase<E> +{ + /** + * Returns an empty queue, implemented as an array. + * + * @return An empty queue, implemented as an array. + */ + protected <T> Queue<T> createQueue() + { + throw new RuntimeException("Not implemented."); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchException.java b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchException.java new file mode 100644 index 0000000000..99a83f96cd --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchException.java @@ -0,0 +1,52 @@ +package org.apache.qpid.util.concurrent; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +/** + * SynchException is used to encapsulate exceptions with the data elements that caused them in order to send exceptions + * back from the consumers of a {@link BatchSynchQueue} to producers. The underlying exception should be retrieved from + * the {@link #getCause} method. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Encapsulate a data element and exception. + * </table> + */ +public class SynchException extends Exception +{ + /** Holds the data element that is in error. */ + Object element; + + /** + * Creates a new BaseApplicationException object. + * + * @param message The exception message. + * @param cause The underlying throwable cause. This may be null. + */ + public SynchException(String message, Throwable cause, Object element) + { + super(message, cause); + + // Keep the data element that was in error. + this.element = element; + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchQueue.java b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchQueue.java new file mode 100644 index 0000000000..95833f398a --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchQueue.java @@ -0,0 +1,48 @@ +package org.apache.qpid.util.concurrent; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import java.util.LinkedList; +import java.util.Queue; + +/** + * SynchQueue completes the {@link BatchSynchQueueBase} abstract class by providing an implementation of the underlying + * queue as a linked list. This uses FIFO ordering for the queue and allows the queue to grow to accomodate more + * elements as needed. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Provide linked list FIFO queue to create a batch synched queue around. + * </table> + */ +public class SynchQueue<E> extends BatchSynchQueueBase<E> +{ + /** + * Returns an empty queue, implemented as a linked list. + * + * @return An empty queue, implemented as a linked list. + */ + protected <T> Queue<T> createQueue() + { + return new LinkedList<T>(); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRecord.java b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRecord.java new file mode 100644 index 0000000000..fd740c20cd --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRecord.java @@ -0,0 +1,74 @@ +package org.apache.qpid.util.concurrent; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +/** + * SynchRecord associates a data item from a {@link BatchSynchQueue} with its producer. This enables the data item data + * item to be put back on the queue without unblocking its producer, or to send exceptions to the producer. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Get the underlying data element. + * <tr><td> Put the data element back on the queue without unblocking its producer. + * <tr><td> Send and exception to the data elements producer. + * </table> + */ +public interface SynchRecord<E> +{ + /** + * Gets the data element contained by this record. + * + * @return The data element contained by this record. + */ + public E getElement(); + + /** + * Tells the synch queue to put this element back onto the queue instead of releasing its producer. + * The element is not requeued immediately but upon calling the {@link SynchRef#unblockProducers()} method. + * + * <p/>This method will raise a runtime exception {@link AlreadyUnblockedException} if the producer for this element + * has already been unblocked. + */ + public void reQueue(); + + /** + * Immediately releases the producer of this data record. Consumers can bring the synchronization time of + * producers to a minimum by using this method to release them at the earliest possible moment when batch + * consuming records from sychronized producers. + */ + public void releaseImmediately(); + + /** + * Tells the synch queue to raise an exception with this elements producer. The exception is not raised immediately + * but upon calling the {@link SynchRef#unblockProducers()} method. The exception will be wrapped in a + * {@link SynchException} before it is raised on the producer. + * + * <p/>This method is unusual in that it accepts an exception as an argument. This is non-standard but is used + * because the exception is to be passed onto a different thread. + * + * <p/>This method will raise a runtime exception {@link AlreadyUnblockedException} if the producer for this element + * has already been unblocked. + * + * @param e The exception to raise on the producer. + */ + public void inError(Exception e); +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRef.java b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRef.java new file mode 100644 index 0000000000..efe2344c06 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRef.java @@ -0,0 +1,51 @@ +package org.apache.qpid.util.concurrent; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +/** + * A SynchRef is an interface which is returned from the synchronous take and drain methods of {@link BatchSynchQueue}, + * allowing call-backs to be made against the synchronizing strucutre. It allows the consumer to communicate when it + * wants producers that have their data taken to be unblocked. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities + * <tr><td> Report number of records returned by a taking operation. + * <tr><td> Provide call-back to release producers of taken records. + * </table> + */ +public interface SynchRef +{ + /** + * Reports the number of records taken by the take or drain operation. + * + * @return The number of records taken by the take or drain operation. + */ + public int getNumRecords(); + + /** + * Any producers that have had their data elements taken from the queue but have not been unblocked are + * unblocked when this method is called. The exception to this is producers that have had their data put back + * onto the queue by a consumer. Producers that have had exceptions for their data items registered by consumers + * will be unblocked but will not return from their put call normally, but with an exception instead. + */ + public void unblockProducers(); +} |