summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main/java/org/apache/qpid/util
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/util')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java695
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java258
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueAtomicSize.java70
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueNoSize.java38
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java395
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/util/MessageQueue.java43
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/util/NameUUIDGen.java59
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/util/NetMatcher.java264
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/util/PrettyPrintingUtils.java75
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/util/RandomUUIDGen.java39
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/util/Serial.java108
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java260
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/util/UUIDGen.java36
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/util/UUIDs.java59
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/AlreadyUnblockedException.java34
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueue.java122
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java834
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BooleanLatch.java128
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/Capacity.java35
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchBuffer.java50
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchException.java52
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchQueue.java48
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRecord.java74
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRef.java51
24 files changed, 3827 insertions, 0 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java b/qpid/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java
new file mode 100644
index 0000000000..09478d4157
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java
@@ -0,0 +1,695 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.regex.*;
+
+/**
+ * CommandLineParser provides a utility for specifying the format of a command line and parsing command lines to ensure
+ * that they fit their specified format. A command line is made up of flags and options, both may be refered to as
+ * options. A flag is an option that does not take an argument (specifying it means it has the value 'true' and not
+ * specifying it means it has the value 'false'). Options must take arguments but they can be set up with defaults so
+ * that they take a default value when not set. Options may be mandatory in wich case it is an error not to specify
+ * them on the command line. Flags are never mandatory because they are implicitly set to false when not specified.
+ *
+ * <p/>Some example command lines are:
+ *
+ * <ul>
+ * <li>This one has two options that expect arguments:
+ * <pre>
+ * cruisecontrol -configfile cruisecontrol.xml -port 9000
+ * </pre>
+ * <li>This has one no-arg flag and two 'free' arguments:
+ * <pre>
+ * zip -r project.zip project/*
+ * </pre>
+ * <li>This one concatenates multiple flags into a single block with only one '-':
+ * <pre>
+ * jar -tvf mytar.tar
+ * </pre>
+ *
+ * <p/>The parsing rules are:
+ *
+ * <ol>
+ * <li>Flags may be combined after a single '-' because they never take arguments. Normally such flags are single letter
+ * flags but this is only a convention and not enforced. Flags of more than one letter are usually specified on their own.
+ * <li>Options expecting arguments must always be on their own.
+ * <li>The argument to an option may be seperated from it by whitespace or appended directly onto the option.
+ * <li>The argument to an option may never begin with a '-' character.
+ * <li>All other arguments not beginning with a '-' character are free arguments that do not belong to any option.
+ * <li>The second or later of a set of duplicate or repeated flags are ignored.
+ * <li>Options are matched up to the shortest matching option. This is because of the possibility of having no space
+ * between an option and its argument. This rules out the possibility of using two options where one is an opening
+ * substring of the other. For example, the options "foo" and "foobar" cannot be used on the same command line because
+ * it is not possible to distinguish the argument "-foobar" from being the "foobar" option or the "foo" option with
+ * the "bar" argument.
+ * </ol>
+ *
+ * <p/>By default, unknown options are simply ignored if specified on the command line. This behaviour may be changed
+ * so that the parser reports all unknowns as errors by using the {@link #setErrorsOnUnknowns} method.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Accept a command line specification.
+ * <tr><td> Parse a command line into properties, validating it against its specification.
+ * <tr><td> Report all errors between a command line and its specification.
+ * <tr><td> Provide a formatted usage string for a command line.
+ * <tr><td> Provide a formatted options in force string for a command line.
+ * <tr><td> Allow errors on unknowns behaviour to be turned on or off.
+ * </table>
+ */
+public class CommandLineParser
+{
+ /** Holds a mapping from command line option names to detailed information about those options. */
+ private Map<String, CommandLineOption> optionMap = new HashMap<String, CommandLineOption>();
+
+ /** Holds a list of parsing errors. */
+ private List<String> parsingErrors = new ArrayList<String>();
+
+ /** Holds the regular expression matcher to match command line options with. */
+ private Matcher optionMatcher = null;
+
+ /** Holds the parsed command line properties after parsing. */
+ private Properties parsedProperties = null;
+
+ /** Flag used to indicate that errors should be created for unknown options. False by default. */
+ private boolean errorsOnUnknowns = false;
+
+ /**
+ * Creates a command line options parser from a command line specification. This is passed to this constructor
+ * as an array of arrays of strings. Each array of strings specifies the command line for a single option. A static
+ * array may therefore easily be used to configure the command line parser in a single method call with an easily
+ * readable format.
+ *
+ * <p/>Each array of strings must be 2, 3, 4 or 5 elements long. If any of the last three elements are missing they
+ * are assumed to be null. The elements specify the following parameters:
+ * <ol>
+ * <li>The name of the option without the leading '-'. For example, "file". To specify the format of the 'free'
+ * arguments use the option names "1", "2", ... and so on.
+ * <li>The option comment. A line of text describing the usage of the option. For example, "The file to be processed."
+ * <li>The options argument. This is a very short description of the argument to the option, often a single word
+ * or a reminder as to the arguments format. When this element is null the option is a flag and does not
+ * accept any arguments. For example, "filename" or "(unix | windows)" or null. The actual text specified
+ * is only used to print in the usage message to remind the user of the usage of the option.
+ * <li>The mandatory flag. When set to "true" an option must always be specified. Any other value, including null,
+ * means that the option is mandatory. Flags are always mandatory (see class javadoc for explanation of why) so
+ * this is ignored for flags.
+ * <li>A regular expression describing the format that the argument must take. Ignored if null.
+ * </ol>
+ * <p/>An example call to this constructor is:
+ *
+ * <pre>
+ * CommandLineParser commandLine = new CommandLineParser(
+ * new String[][] {{"file", "The file to be processed. ", "filename", "true"},
+ * {"dir", "Directory to store results in. Current dir used if not set.", "out dir"},
+ * {"os", "Operating system EOL format to use.", "(windows | unix)", null, "windows\|unix"},
+ * {"v", "Verbose mode. Prints information about the processing as it goes."},
+ * {"1", "The processing command to run.", "command", "true", "add\|remove\|list"}});
+ * </pre>
+ *
+ * @param config The configuration as an array of arrays of strings.
+ */
+ public CommandLineParser(String[][] config)
+ {
+ // Loop through all the command line option specifications creating details for each in the options map.
+ for (int i = 0; i < config.length; i++)
+ {
+ String[] nextOptionSpec = config[i];
+
+ addOption(nextOptionSpec[0], nextOptionSpec[1], (nextOptionSpec.length > 2) ? nextOptionSpec[2] : null,
+ (nextOptionSpec.length > 3) ? ("true".equals(nextOptionSpec[3]) ? true : false) : false,
+ (nextOptionSpec.length > 4) ? nextOptionSpec[4] : null);
+ }
+ }
+
+ /**
+ * Lists all the parsing errors from the most recent parsing in a string.
+ *
+ * @return All the parsing errors from the most recent parsing.
+ */
+ public String getErrors()
+ {
+ // Return the empty string if there are no errors.
+ if (parsingErrors.isEmpty())
+ {
+ return "";
+ }
+
+ // Concatenate all the parsing errors together.
+ StringBuilder result = new StringBuilder();
+
+ for (String s : parsingErrors)
+ {
+ result.append(s);
+ }
+
+ return result.toString();
+ }
+
+ /**
+ * Lists the properties set from the most recent parsing or an empty string if no parsing has been done yet.
+ *
+ * @return The properties set from the most recent parsing or an empty string if no parsing has been done yet.
+ */
+ public String getOptionsInForce()
+ {
+ // Check if there are no properties to report and return and empty string if so.
+ if (parsedProperties == null)
+ {
+ return "";
+ }
+
+ // List all the properties.
+ StringBuilder result = new StringBuilder("Options in force:\n");
+
+ for (Map.Entry<Object, Object> property : parsedProperties.entrySet())
+ {
+ result.append(property.getKey())
+ .append(" = ")
+ .append(property.getValue())
+ .append('\n');
+ }
+
+ return result.toString();
+ }
+
+ /**
+ * Generates a usage string consisting of the name of each option and each options argument description and
+ * comment.
+ *
+ * @return A usage string for all the options.
+ */
+ public String getUsage()
+ {
+ String result = "Options:\n";
+
+ // Print usage on each of the command line options.
+ for (CommandLineOption optionInfo : optionMap.values())
+ {
+ result +=
+ "-" + optionInfo.option + " " + ((optionInfo.argument != null) ? (optionInfo.argument + " ") : "")
+ + optionInfo.comment + "\n";
+ }
+
+ return result;
+ }
+
+ /**
+ * Control the behaviour of the errors on unkowns reporting. When turned on this reports all unkowns options
+ * as errors. When turned off, all unknowns are simply ignored.
+ *
+ * @param errors The setting of the errors on unkown flag. True to turn it on.
+ */
+ public void setErrorsOnUnknowns(boolean errors)
+ {
+ errorsOnUnknowns = errors;
+ }
+
+ /**
+ * Parses a set of command line arguments into a set of properties, keyed by the argument flag. The free arguments
+ * are keyed by integers as strings starting at "1" and then "2", ... and so on.
+ *
+ * <p/>See the class level comment for a description of the parsing rules.
+ *
+ * @param args The command line arguments.
+ *
+ * @return The arguments as a set of properties.
+ *
+ * @throws IllegalArgumentException If the command line cannot be parsed against its specification. If this exception
+ * is thrown a call to {@link #getErrors} will provide a diagnostic of the command
+ * line errors.
+ */
+ public Properties parseCommandLine(String[] args) throws IllegalArgumentException
+ {
+ Properties options = new Properties();
+
+ // Used to keep count of the current 'free' argument.
+ int free = 1;
+
+ // Used to indicate that the most recently parsed option is expecting arguments.
+ boolean expectingArgs = false;
+
+ // The option that is expecting arguments from the next element of the command line.
+ String optionExpectingArgs = null;
+
+ // Used to indicate that the most recently parsed option is a duplicate and should be ignored.
+ boolean ignore = false;
+
+ // Create the regular expression matcher for the command line options.
+ StringBuilder regexp = new StringBuilder("^(");
+ int optionsAdded = 0;
+
+ for (Iterator<String> i = optionMap.keySet().iterator(); i.hasNext();)
+ {
+ String nextOption = i.next();
+
+ // Check that the option is not a free argument definition.
+ boolean notFree = false;
+
+ try
+ {
+ Integer.parseInt(nextOption);
+ }
+ catch (NumberFormatException e)
+ {
+ notFree = true;
+ }
+
+ // Add the option to the regular expression matcher if it is not a free argument definition.
+ if (notFree)
+ {
+ regexp.append(nextOption)
+ .append(i.hasNext() ? "|" : "");
+ optionsAdded++;
+ }
+ }
+
+ // There has to be more that one option in the regular expression or else the compiler complains that the close
+ // cannot be nullable if the '?' token is used to make the matched option string optional.
+ regexp.append(')')
+ .append(((optionsAdded > 0) ? "?" : ""))
+ .append("(.*)");
+ Pattern pattern = Pattern.compile(regexp.toString());
+
+ // Loop through all the command line arguments.
+ for (int i = 0; i < args.length; i++)
+ {
+ // Check if the next command line argument begins with a '-' character and is therefore the start of
+ // an option.
+ if (args[i].startsWith("-"))
+ {
+ // Extract the value of the option without the leading '-'.
+ String arg = args[i].substring(1);
+
+ // Match up to the longest matching option.
+ optionMatcher = pattern.matcher(arg);
+ optionMatcher.matches();
+
+ String matchedOption = optionMatcher.group(1);
+
+ // Match any argument directly appended onto the longest matching option.
+ String matchedArg = optionMatcher.group(2);
+
+ // Check that a known option was matched.
+ if ((matchedOption != null) && !"".equals(matchedOption))
+ {
+ // Get the command line option information for the matched option.
+ CommandLineOption optionInfo = optionMap.get(matchedOption);
+
+ // Check if this option is expecting arguments.
+ if (optionInfo.expectsArgs)
+ {
+ // The option is expecting arguments so swallow the next command line argument as an
+ // argument to this option.
+ expectingArgs = true;
+ optionExpectingArgs = matchedOption;
+
+ // In the mean time set this options argument to the empty string in case no argument is ever
+ // supplied.
+ // options.put(matchedOption, "");
+ }
+
+ // Check if the option was matched on its own and is a flag in which case set that flag.
+ if ("".equals(matchedArg) && !optionInfo.expectsArgs)
+ {
+ options.put(matchedOption, "true");
+ }
+ // The option was matched as a substring with its argument appended to it or is a flag that is
+ // condensed together with other flags.
+ else if (!"".equals(matchedArg))
+ {
+ // Check if the option is a flag and therefore is allowed to be condensed together
+ // with other flags.
+ if (!optionInfo.expectsArgs)
+ {
+ // Set the first matched flag.
+ options.put(matchedOption, "true");
+
+ // Repeat the longest matching process on the remainder but ensure that the remainder
+ // consists only of flags as only flags may be condensed together in this fashion.
+ do
+ {
+ // Match the remainder against the options.
+ optionMatcher = pattern.matcher(matchedArg);
+ optionMatcher.matches();
+
+ matchedOption = optionMatcher.group(1);
+ matchedArg = optionMatcher.group(2);
+
+ // Check that an option was matched.
+ if (matchedOption != null)
+ {
+ // Get the command line option information for the next matched option.
+ optionInfo = optionMap.get(matchedOption);
+
+ // Ensure that the next option is a flag or raise an error if not.
+ if (optionInfo.expectsArgs == true)
+ {
+ parsingErrors.add("Option " + matchedOption + " cannot be combined with flags.\n");
+ }
+
+ options.put(matchedOption, "true");
+ }
+ // The remainder could not be matched against a flag it is either an unknown flag
+ // or an illegal argument to a flag.
+ else
+ {
+ parsingErrors.add("Illegal argument to a flag in the option " + arg + "\n");
+
+ break;
+ }
+ }
+ // Continue until the remainder of the argument has all been matched with flags.
+ while (!"".equals(matchedArg));
+ }
+ // The option is expecting an argument, so store the unmatched portion against it
+ // as its argument.
+ else
+ {
+ // Check the arguments format is correct against any specified format.
+ checkArgumentFormat(optionInfo, matchedArg);
+
+ // Store the argument against its option (regardless of its format).
+ options.put(matchedOption, matchedArg);
+
+ // The argument to this flag has already been supplied to it. Do not swallow the
+ // next command line argument as an argument to this flag.
+ expectingArgs = false;
+ }
+ }
+ }
+ else // No matching option was found.
+ {
+ // Add this to the list of parsing errors if errors on unkowns is being used.
+ if (errorsOnUnknowns)
+ {
+ parsingErrors.add("Option " + matchedOption + " is not a recognized option.\n");
+ }
+ }
+ }
+ // The command line argument did not being with a '-' so it is an argument to the previous flag or it
+ // is a free argument.
+ else
+ {
+ // Check if a previous flag is expecting to swallow this next argument as its argument.
+ if (expectingArgs)
+ {
+ // Get the option info for the option waiting for arguments.
+ CommandLineOption optionInfo = optionMap.get(optionExpectingArgs);
+
+ // Check the arguments format is correct against any specified format.
+ checkArgumentFormat(optionInfo, args[i]);
+
+ // Store the argument against its option (regardless of its format).
+ options.put(optionExpectingArgs, args[i]);
+
+ // Clear the expecting args flag now that the argument has been swallowed.
+ expectingArgs = false;
+ optionExpectingArgs = null;
+ }
+ // This command line option is not an argument to any option. Add it to the set of 'free' options.
+ else
+ {
+ // Get the option info for the free option, if there is any.
+ CommandLineOption optionInfo = optionMap.get(Integer.toString(free));
+
+ if (optionInfo != null)
+ {
+ // Check the arguments format is correct against any specified format.
+ checkArgumentFormat(optionInfo, args[i]);
+ }
+
+ // Add to the list of free options.
+ options.put(Integer.toString(free), args[i]);
+
+ // Move on to the next free argument.
+ free++;
+ }
+ }
+ }
+
+ // Scan through all the specified options to check that all mandatory options have been set and that all flags
+ // that were not set are set to false in the set of properties.
+ for (CommandLineOption optionInfo : optionMap.values())
+ {
+ // Check if this is a flag.
+ if (!optionInfo.expectsArgs)
+ {
+ // Check if the flag is not set in the properties and set it to false if so.
+ if (!options.containsKey(optionInfo.option))
+ {
+ options.put(optionInfo.option, "false");
+ }
+ }
+ // Check if this is a mandatory option and was not set.
+ else if (optionInfo.mandatory && !options.containsKey(optionInfo.option))
+ {
+ // Create an error for the missing option.
+ parsingErrors.add("Option -" + optionInfo.option + " is mandatory but not was not specified.\n");
+ }
+ }
+
+ // Check if there were any errors.
+ if (!parsingErrors.isEmpty())
+ {
+ // Throw an illegal argument exception to signify that there were parsing errors.
+ throw new IllegalArgumentException();
+ }
+
+ // Convert any name/value pairs in the free arguments into properties in the parsed options.
+ options = takeFreeArgsAsProperties(options, 1);
+
+ parsedProperties = options;
+
+ return options;
+ }
+
+ /**
+ * If a command line has been parsed, calling this method sets all of its parsed options into the specified properties.
+ */
+ public void addCommandLineToProperties(Properties properties)
+ {
+ if (parsedProperties != null)
+ {
+ for (Object propKey : parsedProperties.keySet())
+ {
+ String name = (String) propKey;
+ String value = parsedProperties.getProperty(name);
+
+ properties.setProperty(name, value);
+ }
+ }
+ }
+
+ /**
+ * Resets this command line parser after it has been used to parse a command line. This method will only need
+ * to be called to use this parser a second time which is not likely seeing as a command line is usually only
+ * specified once. However, it is exposed as a public method for the rare case where this may be done.
+ *
+ * <p/>Cleans the internal state of this parser, removing all stored errors and information about the options in
+ * force.
+ */
+ public void reset()
+ {
+ parsingErrors = new ArrayList<String>();
+ parsedProperties = null;
+ }
+
+ /**
+ * Adds the option to list of available command line options.
+ *
+ * @param option The option to add as an available command line option.
+ * @param comment A comment for the option.
+ * @param argument The text that appears after the option in the usage string.
+ * @param mandatory When true, indicates that this option is mandatory.
+ * @param formatRegexp The format that the argument must take, defined as a regular expression.
+ */
+ protected void addOption(String option, String comment, String argument, boolean mandatory, String formatRegexp)
+ {
+ // Check if usage text has been set in which case this option is expecting arguments.
+ boolean expectsArgs = ((argument == null) || argument.equals("")) ? false : true;
+
+ // Add the option to the map of command line options.
+ CommandLineOption opt = new CommandLineOption(option, expectsArgs, comment, argument, mandatory, formatRegexp);
+ optionMap.put(option, opt);
+ }
+
+ /**
+ * Converts the free arguments into property declarations. After parsing the command line the free arguments
+ * are numbered from 1, such that the parsed properties contain values for the keys "1", "2", ... This method
+ * converts any free arguments declared using the 'name=value' syntax into properties with key 'name', value
+ * 'value'.
+ *
+ * <p/>For example the comand line:
+ * <pre>
+ * ... debug=true
+ * </pre>
+ *
+ * <p/>After parsing has properties:
+ * <pre>[[1, debug=true]]</pre>
+ *
+ * <p/>After applying this method the properties are:
+ * <pre>[[1, debug=true], [debug, true]]</pre>
+ *
+ * @param properties The parsed command line properties.
+ * @param from The free argument index to convert to properties from.
+ *
+ * @return The parsed command line properties, with free argument name value pairs too.
+ */
+ private Properties takeFreeArgsAsProperties(Properties properties, int from)
+ {
+ for (int i = from; true; i++)
+ {
+ String nextFreeArg = properties.getProperty(Integer.toString(i));
+
+ // Terminate the loop once all free arguments have been consumed.
+ if (nextFreeArg == null)
+ {
+ break;
+ }
+
+ // Split it on the =, strip any whitespace and set it as a system property.
+ String[] nameValuePair = nextFreeArg.split("=");
+
+ if (nameValuePair.length == 2)
+ {
+ properties.setProperty(nameValuePair[0], nameValuePair[1]);
+ }
+ }
+
+ return properties;
+ }
+
+ /**
+ * Checks the format of an argument to an option against its specified regular expression format if one has
+ * been set. Any errors are added to the list of parsing errors.
+ *
+ * @param optionInfo The command line option information for the option which is havings its argument checked.
+ * @param matchedArg The string argument to the option.
+ */
+ private void checkArgumentFormat(CommandLineOption optionInfo, String matchedArg)
+ {
+ // Check if this option enforces a format for its argument.
+ if (optionInfo.argumentFormatRegexp != null)
+ {
+ Pattern pattern = Pattern.compile(optionInfo.argumentFormatRegexp);
+ Matcher argumentMatcher = pattern.matcher(matchedArg);
+
+ // Check if the argument does not meet its required format.
+ if (!argumentMatcher.matches())
+ {
+ // Create an error for this badly formed argument.
+ parsingErrors.add("The argument to option -" + optionInfo.option + " does not meet its required format.\n");
+ }
+ }
+ }
+
+ /**
+ * Extracts all name=value pairs from the command line, sets them all as system properties and also returns
+ * a map of properties containing them.
+ *
+ * @param args The command line.
+ * @param commandLine The command line parser.
+ * @param properties The properties object to inject all parsed properties into (optional may be <tt>null</tt>).
+ *
+ * @return A set of properties containing all name=value pairs from the command line.
+ */
+ public static Properties processCommandLine(String[] args, CommandLineParser commandLine, Properties properties)
+ {
+ // Capture the command line arguments or display errors and correct usage and then exit.
+ Properties options = null;
+
+ try
+ {
+ options = commandLine.parseCommandLine(args);
+
+ // Add all the trailing command line options (name=value pairs) to system properties. They may be picked up
+ // from there.
+ commandLine.addCommandLineToProperties(properties);
+ }
+ catch (IllegalArgumentException e)
+ {
+ System.out.println(commandLine.getErrors());
+ System.out.println(commandLine.getUsage());
+ System.exit(1);
+ }
+
+ return options;
+ }
+
+ /**
+ * Holds information about a command line options. This includes what its name is, whether or not it is a flag,
+ * whether or not it is mandatory, what its user comment is, what its argument reminder text is and what its
+ * regular expression format is.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Hold details of a command line option.
+ * </table>
+ */
+ protected static class CommandLineOption
+ {
+ /** Holds the text for the flag to match this argument with. */
+ public String option = null;
+
+ /** Holds a string describing how to use this command line argument. */
+ public String argument = null;
+
+ /** Flag that determines whether or not this command line argument can take arguments. */
+ public boolean expectsArgs = false;
+
+ /** Holds a short comment describing what this command line argument is for. */
+ public String comment = null;
+
+ /** Flag that determines whether or not this is an mandatory command line argument. */
+ public boolean mandatory = false;
+
+ /** A regular expression describing what format the argument to this option muist have. */
+ public String argumentFormatRegexp = null;
+
+ /**
+ * Create a command line option object that holds specific information about a command line option.
+ *
+ * @param option The text that matches the option.
+ * @param expectsArgs Whether or not the option expects arguments. It is a flag if this is false.
+ * @param comment A comment explaining how to use this option.
+ * @param argument A short reminder of the format of the argument to this option/
+ * @param mandatory Set to true if this option is mandatory.
+ * @param formatRegexp The regular expression that the argument to this option must meet to be valid.
+ */
+ public CommandLineOption(String option, boolean expectsArgs, String comment, String argument, boolean mandatory,
+ String formatRegexp)
+ {
+ this.option = option;
+ this.expectsArgs = expectsArgs;
+ this.comment = comment;
+ this.argument = argument;
+ this.mandatory = mandatory;
+ this.argumentFormatRegexp = formatRegexp;
+ }
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java b/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java
new file mode 100644
index 0000000000..633cf4fe3a
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ConcurrentLinkedMessageQueueAtomicSize<E> extends ConcurrentLinkedQueueAtomicSize<E> implements MessageQueue<E>
+{
+ private static final Logger _logger = LoggerFactory.getLogger(ConcurrentLinkedMessageQueueAtomicSize.class);
+
+ protected Queue<E> _messageHead = new ConcurrentLinkedQueueAtomicSize<E>();
+
+ protected AtomicInteger _messageHeadSize = new AtomicInteger(0);
+
+ @Override
+ public int size()
+ {
+ return super.size() + _messageHeadSize.get();
+ }
+
+ public int headSize()
+ {
+ return _messageHeadSize.get();
+ }
+
+ @Override
+ public E poll()
+ {
+ if (_messageHead.isEmpty())
+ {
+ return super.poll();
+ }
+ else
+ {
+ E e = _messageHead.poll();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Providing item(" + e + ")from message head");
+ }
+
+ if (e != null)
+ {
+ _messageHeadSize.decrementAndGet();
+ }
+
+ return e;
+ }
+ }
+
+ @Override
+ public boolean remove(Object o)
+ {
+
+ if (_messageHead.isEmpty())
+ {
+ return super.remove(o);
+ }
+ else
+ {
+ if (_messageHead.remove(o))
+ {
+ _messageHeadSize.decrementAndGet();
+
+ return true;
+ }
+
+ return super.remove(o);
+ }
+ }
+
+ @Override
+ public boolean removeAll(Collection<?> c)
+ {
+ if (_messageHead.isEmpty())
+ {
+ return super.removeAll(c);
+ }
+ else
+ {
+ // fixme this is super.removeAll but iterator here doesn't work
+ // we need to be able to correctly decrement _messageHeadSize
+ // boolean modified = false;
+ // Iterator<?> e = iterator();
+ // while (e.hasNext())
+ // {
+ // if (c.contains(e.next()))
+ // {
+ // e.remove();
+ // modified = true;
+ // _size.decrementAndGet();
+ // }
+ // }
+ // return modified;
+
+ throw new RuntimeException("Not implemented");
+ }
+ }
+
+ @Override
+ public boolean isEmpty()
+ {
+ return (_messageHead.isEmpty() && super.isEmpty());
+ }
+
+ @Override
+ public void clear()
+ {
+ super.clear();
+ _messageHead.clear();
+ }
+
+ @Override
+ public boolean contains(Object o)
+ {
+ return _messageHead.contains(o) || super.contains(o);
+ }
+
+ @Override
+ public boolean containsAll(Collection<?> o)
+ {
+ return _messageHead.containsAll(o) || super.containsAll(o);
+ }
+
+ @Override
+ public E element()
+ {
+ if (_messageHead.isEmpty())
+ {
+ return super.element();
+ }
+ else
+ {
+ return _messageHead.element();
+ }
+ }
+
+ @Override
+ public E peek()
+ {
+ if (_messageHead.isEmpty())
+ {
+ return super.peek();
+ }
+ else
+ {
+ E o = _messageHead.peek();
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Peeking item (" + o + ") from message head");
+ }
+
+ return o;
+ }
+
+ }
+
+ @Override
+ public Iterator<E> iterator()
+ {
+ final Iterator<E> mainMessageIterator = super.iterator();
+
+ return new Iterator<E>()
+ {
+ final Iterator<E> _headIterator = _messageHead.iterator();
+ final Iterator<E> _mainIterator = mainMessageIterator;
+
+ Iterator<E> last;
+
+ public boolean hasNext()
+ {
+ return _headIterator.hasNext() || _mainIterator.hasNext();
+ }
+
+ public E next()
+ {
+ if (_headIterator.hasNext())
+ {
+ last = _headIterator;
+
+ return _headIterator.next();
+ }
+ else
+ {
+ last = _mainIterator;
+
+ return _mainIterator.next();
+ }
+ }
+
+ public void remove()
+ {
+ last.remove();
+ if(last == _mainIterator)
+ {
+ _size.decrementAndGet();
+ }
+ else
+ {
+ _messageHeadSize.decrementAndGet();
+ }
+ }
+ };
+ }
+
+ @Override
+ public boolean retainAll(Collection<?> c)
+ {
+ throw new RuntimeException("Not Implemented");
+ }
+
+ @Override
+ public Object[] toArray()
+ {
+ throw new RuntimeException("Not Implemented");
+ }
+
+ public boolean pushHead(E o)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Adding item(" + o + ") to head of queue");
+ }
+
+ if (_messageHead.offer(o))
+ {
+ _messageHeadSize.incrementAndGet();
+
+ return true;
+ }
+
+ return false;
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueAtomicSize.java b/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueAtomicSize.java
new file mode 100644
index 0000000000..c4d7683a02
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueAtomicSize.java
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ConcurrentLinkedQueueAtomicSize<E> extends ConcurrentLinkedQueue<E>
+{
+ AtomicInteger _size = new AtomicInteger(0);
+
+ public int size()
+ {
+ return _size.get();
+ }
+
+ public boolean offer(E o)
+ {
+
+ if (super.offer(o))
+ {
+ _size.incrementAndGet();
+ return true;
+ }
+
+ return false;
+ }
+
+ public E poll()
+ {
+ E e = super.poll();
+
+ if (e != null)
+ {
+ _size.decrementAndGet();
+ }
+
+ return e;
+ }
+
+ @Override
+ public boolean remove(Object o)
+ {
+ if (super.remove(o))
+ {
+ _size.decrementAndGet();
+ return true;
+ }
+
+ return false;
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueNoSize.java b/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueNoSize.java
new file mode 100644
index 0000000000..1f168345a1
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueNoSize.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class ConcurrentLinkedQueueNoSize<E> extends ConcurrentLinkedQueue<E>
+{
+ public int size()
+ {
+ if (isEmpty())
+ {
+ return 0;
+ }
+ else
+ {
+ return 1;
+ }
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java b/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java
new file mode 100644
index 0000000000..1a57af9bf7
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java
@@ -0,0 +1,395 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * FileUtils provides some simple helper methods for working with files. It follows the convention of wrapping all
+ * checked exceptions as runtimes, so code using these methods is free of try-catch blocks but does not expect to
+ * recover from errors.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Read a text file as a string.
+ * <tr><td> Open a file or default resource as an input stream.
+ * </table>
+ */
+public class FileUtils
+{
+ /**
+ * Reads a text file as a string.
+ *
+ * @param filename The name of the file.
+ *
+ * @return The contents of the file.
+ */
+ public static String readFileAsString(String filename)
+ {
+ BufferedInputStream is = null;
+
+ try
+ {
+ try
+ {
+ is = new BufferedInputStream(new FileInputStream(filename));
+ }
+ catch (FileNotFoundException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ return readStreamAsString(is);
+ }
+ finally
+ {
+ if (is != null)
+ {
+ try
+ {
+ is.close();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Reads a text file as a string.
+ *
+ * @param file The file.
+ *
+ * @return The contents of the file.
+ */
+ public static String readFileAsString(File file)
+ {
+ BufferedInputStream is = null;
+
+ try
+ {
+ is = new BufferedInputStream(new FileInputStream(file));
+ }
+ catch (FileNotFoundException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ return readStreamAsString(is);
+ }
+
+ /**
+ * Reads the contents of a reader, one line at a time until the end of stream is encountered, and returns all
+ * together as a string.
+ *
+ * @param is The reader.
+ *
+ * @return The contents of the reader.
+ */
+ private static String readStreamAsString(BufferedInputStream is)
+ {
+ try
+ {
+ byte[] data = new byte[4096];
+
+ StringBuffer inBuffer = new StringBuffer();
+
+ String line;
+ int read;
+
+ while ((read = is.read(data)) != -1)
+ {
+ String s = new String(data, 0, read);
+ inBuffer.append(s);
+ }
+
+ return inBuffer.toString();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Either opens the specified filename as an input stream, or uses the default resource loaded using the
+ * specified class loader, if opening the file fails or no file name is specified.
+ *
+ * @param filename The name of the file to open.
+ * @param defaultResource The name of the default resource on the classpath if the file cannot be opened.
+ * @param cl The classloader to load the default resource with.
+ *
+ * @return An input stream for the file or resource, or null if one could not be opened.
+ */
+ public static InputStream openFileOrDefaultResource(String filename, String defaultResource, ClassLoader cl)
+ {
+ InputStream is = null;
+
+ // Flag to indicate whether the default resource should be used. By default this is true, so that the default
+ // is used when opening the file fails.
+ boolean useDefault = true;
+
+ // Try to open the file if one was specified.
+ if (filename != null)
+ {
+ try
+ {
+ is = new BufferedInputStream(new FileInputStream(new File(filename)));
+
+ // Clear the default flag because the file was succesfully opened.
+ useDefault = false;
+ }
+ catch (FileNotFoundException e)
+ {
+ // Ignore this exception, the default will be used instead.
+ }
+ }
+
+ // Load the default resource if a file was not specified, or if opening the file failed.
+ if (useDefault)
+ {
+ is = cl.getResourceAsStream(defaultResource);
+ }
+
+ return is;
+ }
+
+ /**
+ * Copies the specified source file to the specified destintaion file. If the destinationst file does not exist,
+ * it is created.
+ *
+ * @param src The source file name.
+ * @param dst The destination file name.
+ */
+ public static void copy(File src, File dst)
+ {
+ try
+ {
+ copyCheckedEx(src, dst);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Copies the specified source file to the specified destination file. If the destination file does not exist,
+ * it is created.
+ *
+ * @param src The source file name.
+ * @param dst The destination file name.
+ * @throws IOException
+ */
+ public static void copyCheckedEx(File src, File dst) throws IOException
+ {
+ InputStream in = new FileInputStream(src);
+ try
+ {
+ if (!dst.exists())
+ {
+ dst.createNewFile();
+ }
+
+ OutputStream out = new FileOutputStream(dst);
+
+ try
+ {
+ // Transfer bytes from in to out
+ byte[] buf = new byte[1024];
+ int len;
+ while ((len = in.read(buf)) > 0)
+ {
+ out.write(buf, 0, len);
+ }
+ }
+ finally
+ {
+ out.close();
+ }
+ }
+ finally
+ {
+ in.close();
+ }
+ }
+
+ /*
+ * Deletes a given file
+ */
+ public static boolean deleteFile(String filePath)
+ {
+ return delete(new File(filePath), false);
+ }
+
+ /*
+ * Deletes a given empty directory
+ */
+ public static boolean deleteDirectory(String directoryPath)
+ {
+ File directory = new File(directoryPath);
+
+ if (directory.isDirectory())
+ {
+ if (directory.listFiles().length == 0)
+ {
+ return delete(directory, true);
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Delete a given file/directory,
+ * A directory will always require the recursive flag to be set.
+ * if a directory is specified and recursive set then delete the whole tree
+ *
+ * @param file the File object to start at
+ * @param recursive boolean to recurse if a directory is specified.
+ *
+ * @return <code>true</code> if and only if the file or directory is
+ * successfully deleted; <code>false</code> otherwise
+ */
+ public static boolean delete(File file, boolean recursive)
+ {
+ boolean success = true;
+
+ if (file.isDirectory())
+ {
+ if (recursive)
+ {
+ File[] files = file.listFiles();
+
+ // This can occur if the file is deleted outside the JVM
+ if (files == null)
+ {
+ return false;
+ }
+
+ for (int i = 0; i < files.length; i++)
+ {
+ success = delete(files[i], true) && success;
+ }
+
+ return success && file.delete();
+ }
+
+ return false;
+ }
+
+ return file.delete();
+ }
+
+ public static class UnableToCopyException extends Exception
+ {
+ UnableToCopyException(String msg)
+ {
+ super(msg);
+ }
+ }
+
+ public static void copyRecursive(File source, File dst) throws FileNotFoundException, UnableToCopyException
+ {
+
+ if (!source.exists())
+ {
+ throw new FileNotFoundException("Unable to copy '" + source.toString() + "' as it does not exist.");
+ }
+
+ if (dst.exists() && !dst.isDirectory())
+ {
+ throw new IllegalArgumentException("Unable to copy '" + source.toString() + "' to '" + dst + "' a file with same name exists.");
+ }
+
+ if (source.isFile())
+ {
+ copy(source, dst);
+ }
+
+ //else we have a source directory
+ if (!dst.isDirectory() && !dst.mkdirs())
+ {
+ throw new UnableToCopyException("Unable to create destination directory");
+ }
+
+ for (File file : source.listFiles())
+ {
+ if (file.isFile())
+ {
+ copy(file, new File(dst.toString() + File.separator + file.getName()));
+ }
+ else
+ {
+ copyRecursive(file, new File(dst + File.separator + file.getName()));
+ }
+ }
+
+ }
+
+ /**
+ * Checks the specified file for instances of the search string.
+ *
+ * @param file the file to search
+ * @param search the search String
+ *
+ * @throws java.io.IOException
+ * @return the list of matching entries
+ */
+ public static List<String> searchFile(File file, String search)
+ throws IOException
+ {
+
+ List<String> results = new LinkedList<String>();
+
+ BufferedReader reader = new BufferedReader(new FileReader(file));
+ try
+ {
+ while (reader.ready())
+ {
+ String line = reader.readLine();
+ if (line.contains(search))
+ {
+ results.add(line);
+ }
+ }
+ }
+ finally
+ {
+ reader.close();
+ }
+
+ return results;
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/MessageQueue.java b/qpid/java/common/src/main/java/org/apache/qpid/util/MessageQueue.java
new file mode 100644
index 0000000000..b5efaa61b6
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/util/MessageQueue.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.util;
+
+import java.util.Queue;
+
+/**
+ * Defines a queue that has a push operation to add an element to the head of the queue.
+ *
+ * @todo Seems like this may be pointless, the implementation uses this method to increment the message count
+ * then calls offer. Why not simply override offer and drop this interface?
+ */
+public interface MessageQueue<E> extends Queue<E>
+{
+ /**
+ * Inserts the specified element into this queue, if possible. When using queues that may impose insertion
+ * restrictions (for example capacity bounds), method offer is generally preferable to method Collection.add(E),
+ * which can fail to insert an element only by throwing an exception.
+ *
+ * @param o The element to insert.
+ *
+ * @return <tt>true</tt> if it was possible to add the element to this queue, else <tt>false</tt>
+ */
+ boolean pushHead(E o);
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/NameUUIDGen.java b/qpid/java/common/src/main/java/org/apache/qpid/util/NameUUIDGen.java
new file mode 100644
index 0000000000..e764c8536b
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/util/NameUUIDGen.java
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+
+/**
+ * NameUUIDGen
+ *
+ */
+
+public final class NameUUIDGen implements UUIDGen
+{
+
+ private static final int WIDTH = 8;
+
+ final private byte[] seed;
+ final private ByteBuffer seedBuf;
+ private long counter;
+
+ public NameUUIDGen()
+ {
+ String namespace = UUID.randomUUID().toString();
+ this.seed = new byte[namespace.length() + WIDTH];
+ for (int i = WIDTH; i < seed.length; i++)
+ {
+ seed[i] = (byte) namespace.charAt(i - WIDTH);
+ }
+ this.seedBuf = ByteBuffer.wrap(seed);
+ this.counter = 0;
+ }
+
+ public UUID generate()
+ {
+ seedBuf.putLong(0, counter++);
+ return UUID.nameUUIDFromBytes(seed);
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/NetMatcher.java b/qpid/java/common/src/main/java/org/apache/qpid/util/NetMatcher.java
new file mode 100644
index 0000000000..4c653e6ca0
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/util/NetMatcher.java
@@ -0,0 +1,264 @@
+/***********************************************************************
+ * Copyright (c) 2000-2006 The Apache Software Foundation. *
+ * All rights reserved. *
+ * ------------------------------------------------------------------- *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you *
+ * may not use this file except in compliance with the License. You *
+ * may obtain a copy of the License at: *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, software *
+ * distributed under the License is distributed on an "AS IS" BASIS, *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or *
+ * implied. See the License for the specific language governing *
+ * permissions and limitations under the License. *
+ ***********************************************************************/
+
+package org.apache.qpid.util;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+
+public class NetMatcher
+{
+ private ArrayList networks;
+
+ public void initInetNetworks(final Collection nets)
+ {
+ networks = new ArrayList();
+ for (Iterator iter = nets.iterator(); iter.hasNext(); ) try
+ {
+ InetNetwork net = InetNetwork.getFromString((String) iter.next());
+ if (!networks.contains(net)) networks.add(net);
+ }
+ catch (java.net.UnknownHostException uhe)
+ {
+ log("Cannot resolve address: " + uhe.getMessage());
+ }
+ networks.trimToSize();
+ }
+
+ public void initInetNetworks(final String[] nets)
+ {
+ networks = new ArrayList();
+ for (int i = 0; i < nets.length; i++) try
+ {
+ InetNetwork net = InetNetwork.getFromString(nets[i]);
+ if (!networks.contains(net)) networks.add(net);
+ }
+ catch (java.net.UnknownHostException uhe)
+ {
+ log("Cannot resolve address: " + uhe.getMessage());
+ }
+ networks.trimToSize();
+ }
+
+ public boolean matchInetNetwork(final String hostIP)
+ {
+ InetAddress ip = null;
+
+ try
+ {
+ ip = InetAddress.getByName(hostIP);
+ }
+ catch (java.net.UnknownHostException uhe)
+ {
+ log("Cannot resolve address for " + hostIP + ": " + uhe.getMessage());
+ }
+
+ boolean sameNet = false;
+
+ if (ip != null) for (Iterator iter = networks.iterator(); (!sameNet) && iter.hasNext(); )
+ {
+ InetNetwork network = (InetNetwork) iter.next();
+ sameNet = network.contains(ip);
+ }
+ return sameNet;
+ }
+
+ public boolean matchInetNetwork(final InetAddress ip)
+ {
+ boolean sameNet = false;
+
+ for (Iterator iter = networks.iterator(); (!sameNet) && iter.hasNext(); )
+ {
+ InetNetwork network = (InetNetwork) iter.next();
+ sameNet = network.contains(ip);
+ }
+ return sameNet;
+ }
+
+ public NetMatcher()
+ {
+ }
+
+ public NetMatcher(final String[] nets)
+ {
+ initInetNetworks(nets);
+ }
+
+ public NetMatcher(final Collection nets)
+ {
+ initInetNetworks(nets);
+ }
+
+ public String toString() {
+ return networks.toString();
+ }
+
+ protected void log(String s) { }
+}
+
+class InetNetwork
+{
+ /*
+ * Implements network masking, and is compatible with RFC 1518 and
+ * RFC 1519, which describe CIDR: Classless Inter-Domain Routing.
+ */
+
+ private InetAddress network;
+ private InetAddress netmask;
+
+ public InetNetwork(InetAddress ip, InetAddress netmask)
+ {
+ network = maskIP(ip, netmask);
+ this.netmask = netmask;
+ }
+
+ public boolean contains(final String name) throws java.net.UnknownHostException
+ {
+ return network.equals(maskIP(InetAddress.getByName(name), netmask));
+ }
+
+ public boolean contains(final InetAddress ip)
+ {
+ return network.equals(maskIP(ip, netmask));
+ }
+
+ public String toString()
+ {
+ return network.getHostAddress() + "/" + netmask.getHostAddress();
+ }
+
+ public int hashCode()
+ {
+ return maskIP(network, netmask).hashCode();
+ }
+
+ public boolean equals(Object obj)
+ {
+ return (obj != null) && (obj instanceof InetNetwork) &&
+ ((((InetNetwork)obj).network.equals(network)) && (((InetNetwork)obj).netmask.equals(netmask)));
+ }
+
+ public static InetNetwork getFromString(String netspec) throws java.net.UnknownHostException
+ {
+ if (netspec.endsWith("*")) netspec = normalizeFromAsterisk(netspec);
+ else
+ {
+ int iSlash = netspec.indexOf('/');
+ if (iSlash == -1) netspec += "/255.255.255.255";
+ else if (netspec.indexOf('.', iSlash) == -1) netspec = normalizeFromCIDR(netspec);
+ }
+
+ return new InetNetwork(InetAddress.getByName(netspec.substring(0, netspec.indexOf('/'))),
+ InetAddress.getByName(netspec.substring(netspec.indexOf('/') + 1)));
+ }
+
+ public static InetAddress maskIP(final byte[] ip, final byte[] mask)
+ {
+ try
+ {
+ return getByAddress(new byte[]
+ {
+ (byte) (mask[0] & ip[0]),
+ (byte) (mask[1] & ip[1]),
+ (byte) (mask[2] & ip[2]),
+ (byte) (mask[3] & ip[3])
+ });
+ }
+ catch(Exception _) {}
+ {
+ return null;
+ }
+ }
+
+ public static InetAddress maskIP(final InetAddress ip, final InetAddress mask)
+ {
+ return maskIP(ip.getAddress(), mask.getAddress());
+ }
+
+ /*
+ * This converts from an uncommon "wildcard" CIDR format
+ * to "address + mask" format:
+ *
+ * * => 000.000.000.0/000.000.000.0
+ * xxx.* => xxx.000.000.0/255.000.000.0
+ * xxx.xxx.* => xxx.xxx.000.0/255.255.000.0
+ * xxx.xxx.xxx.* => xxx.xxx.xxx.0/255.255.255.0
+ */
+ static private String normalizeFromAsterisk(final String netspec)
+ {
+ String[] masks = { "0.0.0.0/0.0.0.0", "0.0.0/255.0.0.0", "0.0/255.255.0.0", "0/255.255.255.0" };
+ char[] srcb = netspec.toCharArray();
+ int octets = 0;
+ for (int i = 1; i < netspec.length(); i++) {
+ if (srcb[i] == '.') octets++;
+ }
+ return (octets == 0) ? masks[0] : netspec.substring(0, netspec.length() -1 ).concat(masks[octets]);
+ }
+
+ /*
+ * RFC 1518, 1519 - Classless Inter-Domain Routing (CIDR)
+ * This converts from "prefix + prefix-length" format to
+ * "address + mask" format, e.g. from xxx.xxx.xxx.xxx/yy
+ * to xxx.xxx.xxx.xxx/yyy.yyy.yyy.yyy.
+ */
+ static private String normalizeFromCIDR(final String netspec)
+ {
+ final int bits = 32 - Integer.parseInt(netspec.substring(netspec.indexOf('/')+1));
+ final int mask = (bits == 32) ? 0 : 0xFFFFFFFF - ((1 << bits)-1);
+
+ return netspec.substring(0, netspec.indexOf('/') + 1) +
+ Integer.toString(mask >> 24 & 0xFF, 10) + "." +
+ Integer.toString(mask >> 16 & 0xFF, 10) + "." +
+ Integer.toString(mask >> 8 & 0xFF, 10) + "." +
+ Integer.toString(mask >> 0 & 0xFF, 10);
+ }
+
+ private static java.lang.reflect.Method getByAddress = null;
+
+ static {
+ try {
+ Class inetAddressClass = Class.forName("java.net.InetAddress");
+ Class[] parameterTypes = { byte[].class };
+ getByAddress = inetAddressClass.getMethod("getByAddress", parameterTypes);
+ } catch (Exception e) {
+ getByAddress = null;
+ }
+ }
+
+ private static InetAddress getByAddress(byte[] ip) throws java.net.UnknownHostException
+ {
+ InetAddress addr = null;
+ if (getByAddress != null) try {
+ addr = (InetAddress) getByAddress.invoke(null, new Object[] { ip });
+ } catch (IllegalAccessException e) {
+ } catch (java.lang.reflect.InvocationTargetException e) {
+ }
+
+ if (addr == null) {
+ addr = InetAddress.getByName
+ (
+ Integer.toString(ip[0] & 0xFF, 10) + "." +
+ Integer.toString(ip[1] & 0xFF, 10) + "." +
+ Integer.toString(ip[2] & 0xFF, 10) + "." +
+ Integer.toString(ip[3] & 0xFF, 10)
+ );
+ }
+ return addr;
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/PrettyPrintingUtils.java b/qpid/java/common/src/main/java/org/apache/qpid/util/PrettyPrintingUtils.java
new file mode 100644
index 0000000000..93266f2486
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/util/PrettyPrintingUtils.java
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+/**
+ * Contains pretty printing convenienve methods for producing formatted logging output, mostly for debugging purposes.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * </table>
+ *
+ * @todo Drop this. There are already array pretty printing methods it java.utils.Arrays.
+ */
+public class PrettyPrintingUtils
+{
+ /**
+ * Pretty prints an array of ints as a string.
+ *
+ * @param array The array to pretty print.
+ *
+ * @return The pretty printed string.
+ */
+ public static String printArray(int[] array)
+ {
+ StringBuilder result = new StringBuilder("[");
+ for (int i = 0; i < array.length; i++)
+ {
+ result.append(array[i])
+ .append((i < (array.length - 1)) ? ", " : "");
+ }
+
+ result.append(']');
+
+ return result.toString();
+ }
+
+ /**
+ * Pretty prints an array of strings as a string.
+ *
+ * @param array The array to pretty print.
+ *
+ * @return The pretty printed string.
+ */
+ public static String printArray(String[] array)
+ {
+ String result = "[";
+ for (int i = 0; i < array.length; i++)
+ {
+ result += array[i];
+ result += (i < (array.length - 1)) ? ", " : "";
+ }
+
+ result += "]";
+
+ return result;
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/RandomUUIDGen.java b/qpid/java/common/src/main/java/org/apache/qpid/util/RandomUUIDGen.java
new file mode 100644
index 0000000000..60b402a105
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/util/RandomUUIDGen.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+import java.util.UUID;
+
+
+/**
+ * RandomUUIDGen
+ *
+ */
+
+public final class RandomUUIDGen implements UUIDGen
+{
+
+ public UUID generate()
+ {
+ return UUID.randomUUID();
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/Serial.java b/qpid/java/common/src/main/java/org/apache/qpid/util/Serial.java
new file mode 100644
index 0000000000..8ad9d00f54
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/util/Serial.java
@@ -0,0 +1,108 @@
+package org.apache.qpid.util;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.util.Comparator;
+
+import org.apache.qpid.SerialException;
+
+/**
+ * This class provides basic serial number comparisons as defined in
+ * RFC 1982.
+ */
+
+public class Serial
+{
+
+ public static final Comparator<Integer> COMPARATOR = new Comparator<Integer>()
+ {
+ public int compare(Integer s1, Integer s2)
+ {
+ return Serial.compare(s1, s2);
+ }
+ };
+
+ /**
+ * Compares two numbers using serial arithmetic.
+ *
+ * @param s1 the first serial number
+ * @param s2 the second serial number
+ *
+ * @return a negative integer, zero, or a positive integer as the
+ * first argument is less than, equal to, or greater than the
+ * second
+ */
+ public static final int compare(int s1, int s2)
+ {
+ return s1 - s2;
+ }
+
+ public static final boolean lt(int s1, int s2)
+ {
+ return compare(s1, s2) < 0;
+ }
+
+ public static final boolean le(int s1, int s2)
+ {
+ return compare(s1, s2) <= 0;
+ }
+
+ public static final boolean gt(int s1, int s2)
+ {
+ return compare(s1, s2) > 0;
+ }
+
+ public static final boolean ge(int s1, int s2)
+ {
+ return compare(s1, s2) >= 0;
+ }
+
+ public static final boolean eq(int s1, int s2)
+ {
+ return s1 == s2;
+ }
+
+ public static final int min(int s1, int s2)
+ {
+ if (lt(s1, s2))
+ {
+ return s1;
+ }
+ else
+ {
+ return s2;
+ }
+ }
+
+ public static final int max(int s1, int s2)
+ {
+ if (gt(s1, s2))
+ {
+ return s1;
+ }
+ else
+ {
+ return s2;
+ }
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java b/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java
new file mode 100644
index 0000000000..a6a8b8beb4
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java
@@ -0,0 +1,260 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+import java.io.UnsupportedEncodingException;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Stack;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+/**
+ * Strings
+ *
+ */
+
+public final class Strings
+{
+
+ private static final byte[] EMPTY = new byte[0];
+
+ private static final ThreadLocal<char[]> charbuf = new ThreadLocal<char[]>()
+ {
+ public char[] initialValue()
+ {
+ return new char[4096];
+ }
+ };
+
+ public static final byte[] toUTF8(String str)
+ {
+ if (str == null)
+ {
+ return EMPTY;
+ }
+ else
+ {
+ final int size = str.length();
+ char[] chars = charbuf.get();
+ if (size > chars.length)
+ {
+ chars = new char[Math.max(size, 2*chars.length)];
+ charbuf.set(chars);
+ }
+
+ str.getChars(0, size, chars, 0);
+ final byte[] bytes = new byte[size];
+ for (int i = 0; i < size; i++)
+ {
+ if (chars[i] > 127)
+ {
+ try
+ {
+ return str.getBytes("UTF-8");
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ bytes[i] = (byte) chars[i];
+ }
+ return bytes;
+ }
+ }
+
+ public static final String fromUTF8(byte[] bytes)
+ {
+ try
+ {
+ return new String(bytes, "UTF-8");
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static final Pattern VAR = Pattern.compile("(?:\\$\\{([^\\}]*)\\})|(?:\\$(\\$))");
+
+ public static interface Resolver
+ {
+ String resolve(String variable);
+ }
+
+ public static class MapResolver implements Resolver
+ {
+
+ private final Map<String,String> map;
+
+ public MapResolver(Map<String,String> map)
+ {
+ this.map = map;
+ }
+
+ public String resolve(String variable)
+ {
+ return map.get(variable);
+ }
+ }
+
+ public static class PropertiesResolver implements Resolver
+ {
+
+ private final Properties properties;
+
+ public PropertiesResolver(Properties properties)
+ {
+ this.properties = properties;
+ }
+
+ public String resolve(String variable)
+ {
+ return properties.getProperty(variable);
+ }
+ }
+
+ public static class ChainedResolver implements Resolver
+ {
+ private final Resolver primary;
+ private final Resolver secondary;
+
+ public ChainedResolver(Resolver primary, Resolver secondary)
+ {
+ this.primary = primary;
+ this.secondary = secondary;
+ }
+
+ public String resolve(String variable)
+ {
+ String result = primary.resolve(variable);
+ if (result == null)
+ {
+ result = secondary.resolve(variable);
+ }
+ return result;
+ }
+ }
+
+ public static final Resolver SYSTEM_RESOLVER = new Resolver()
+ {
+ public String resolve(String variable)
+ {
+ String result = System.getProperty(variable);
+ if (result == null)
+ {
+ result = System.getenv(variable);
+ }
+ return result;
+ }
+ };
+
+ public static final String expand(String input)
+ {
+ return expand(input, SYSTEM_RESOLVER);
+ }
+
+ public static final String expand(String input, Resolver resolver)
+ {
+ return expand(input, resolver, new Stack<String>());
+ }
+
+ private static final String expand(String input, Resolver resolver, Stack<String> stack)
+ {
+ Matcher m = VAR.matcher(input);
+ StringBuffer result = new StringBuffer();
+ while (m.find())
+ {
+ String var = m.group(1);
+ if (var == null)
+ {
+ String esc = m.group(2);
+ if ("$".equals(esc))
+ {
+ m.appendReplacement(result, Matcher.quoteReplacement("$"));
+ }
+ else
+ {
+ throw new IllegalArgumentException(esc);
+ }
+ }
+ else
+ {
+ m.appendReplacement(result, Matcher.quoteReplacement(resolve(var, resolver, stack)));
+ }
+ }
+ m.appendTail(result);
+ return result.toString();
+ }
+
+ private static final String resolve(String var, Resolver resolver, Stack<String> stack)
+ {
+ if (stack.contains(var))
+ {
+ throw new IllegalArgumentException
+ (String.format("recursively defined variable: %s stack=%s", var,
+ stack));
+ }
+
+ String result = resolver.resolve(var);
+ if (result == null)
+ {
+ throw new IllegalArgumentException("no such variable: " + var);
+ }
+
+ stack.push(var);
+ try
+ {
+ return expand(result, resolver, stack);
+ }
+ finally
+ {
+ stack.pop();
+ }
+ }
+
+ public static final String join(String sep, Iterable items)
+ {
+ StringBuilder result = new StringBuilder();
+
+ for (Object o : items)
+ {
+ if (result.length() > 0)
+ {
+ result.append(sep);
+ }
+ result.append(o.toString());
+ }
+
+ return result.toString();
+ }
+
+ public static final String join(String sep, Object[] items)
+ {
+ return join(sep, Arrays.asList(items));
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/UUIDGen.java b/qpid/java/common/src/main/java/org/apache/qpid/util/UUIDGen.java
new file mode 100644
index 0000000000..3cfe5afdac
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/util/UUIDGen.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+
+import java.util.UUID;
+
+/**
+ * UUIDGen
+ *
+ */
+
+public interface UUIDGen
+{
+
+ public UUID generate();
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/UUIDs.java b/qpid/java/common/src/main/java/org/apache/qpid/util/UUIDs.java
new file mode 100644
index 0000000000..4bf6b7f0a2
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/util/UUIDs.java
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+
+/**
+ * UUIDs
+ *
+ */
+
+public final class UUIDs
+{
+
+ public static final UUIDGen newGenerator()
+ {
+ return newGenerator(System.getProperty("qpid.uuid.generator",
+ NameUUIDGen.class.getName()));
+ }
+
+ public static UUIDGen newGenerator(String name)
+ {
+ try
+ {
+ Class cls = Class.forName(name);
+ return (UUIDGen) cls.newInstance();
+ }
+ catch (InstantiationException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/AlreadyUnblockedException.java b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/AlreadyUnblockedException.java
new file mode 100644
index 0000000000..e0c0337898
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/AlreadyUnblockedException.java
@@ -0,0 +1,34 @@
+package org.apache.qpid.util.concurrent;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+/**
+ * Used to signal that a data element and its producer cannot be requeued or sent an error message when using a
+ * {@link BatchSynchQueue} because the producer has already been unblocked by an unblocking take on the queue.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Signal that an unblocking take has already occurred.
+ * </table>
+ */
+public class AlreadyUnblockedException extends RuntimeException
+{ }
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueue.java b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueue.java
new file mode 100644
index 0000000000..63d8f77edb
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueue.java
@@ -0,0 +1,122 @@
+package org.apache.qpid.util.concurrent;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * BatchSynchQueue is an abstraction of the classic producer/consumer buffer pattern for thread interaction. In this
+ * pattern threads can deposit data onto a buffer whilst other threads take data from the buffer and perform usefull
+ * work with it. A BatchSynchQueue adds to this the possibility that producers can be blocked until their data is
+ * consumed or until a consumer chooses to release the producer some time after consuming the data from the queue.
+ *
+ * <p>There are a number of possible advantages to using this technique when compared with having the producers
+ * processing their own data:
+ *
+ * <ul>
+ * <li>Data may be deposited asynchronously in the buffer allowing the producers to continue running.</li>
+ * <li>Data may be deposited synchronously in the buffer so that producers wait until their data has been processed
+ * before being allowed to continue.</li>
+ * <li>Variable rates of production/consumption can be smoothed over by the buffer as it provides space in memory to
+ * hold data between production and consumption.</li>
+ * <li>Consumers may be able to batch data as they consume it leading to more efficient consumption over
+ * individual data item consumption where latency associated with the consume operation can be ammortized.
+ * For example, it may be possibly to ammortize the cost of a disk seek over many producers.</li>
+ * <li>Data from seperate threads can be combined together in the buffer, providing a convenient way of spreading work
+ * amongst many workers and gathering the results together again.</li>
+ * <li>Different types of queue can be used to hold the buffer, resulting in different processing orders. For example,
+ * lifo, fifo, priority heap, etc.</li>
+ * </ul>
+ *
+ * <p/>The asynchronous type of producer/consumer buffers is already well supported by the java.util.concurrent package
+ * (in Java 5) and there is also a synchronous queue implementation available there too. This interface extends the
+ * blocking queue with some more methods for controlling a synchronous blocking queue. In particular it adds additional
+ * take methods that can be used to take data from a queue without releasing producers, so that consumers have an
+ * opportunity to confirm correct processing of the data before producers are released. It also adds a put method with
+ * exceptions so that consumers can signal exception cases back to producers where there are errors in the data.
+ *
+ * <p/>This type of queue is usefull in situations where consumers can obtain an efficiency gain by batching data
+ * from many threads but where synchronous handling of that data is neccessary because producers need to know that
+ * their data has been processed before they continue. For example, sending a bundle of messages together, or writing
+ * many records to disk at once, may result in improved performance but the originators of the messages or disk records
+ * need confirmation that their data has really been sent or saved to disk.
+ *
+ * <p/>The consumer can put an element back onto the queue or send an error message to the elements producer using the
+ * {@link SynchRecord} interface.
+ *
+ * <p/>The {@link #take()}, {@link #drainTo(java.util.Collection<? super E>)} and
+ * {@link #drainTo(java.util.Collection<? super E>, int)} methods from {@link BlockingQueue} should behave as if they
+ * have been called with unblock set to false. That is they take elements from the queue but leave the producers
+ * blocked. These methods do not return collections of {@link SynchRecord}s so they do not supply an interface through
+ * which errors or re-queuings can be applied. If these methods are used then the consumer must succesfully process
+ * all the records it takes.
+ *
+ * <p/>The {@link #put} method should silently swallow any exceptions that consumers attempt to return to the caller.
+ * In order to handle exceptions the {@link #tryPut} method must be used.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Handle synchronous puts, with possible exceptions.
+ * <tr><td> Allow consumers to take many records from a queue in a batch.
+ * <tr><td> Allow consumers to decide when to unblock synchronous producers.
+ * </table>
+ */
+public interface BatchSynchQueue<E> extends BlockingQueue<E>
+{
+ /**
+ * Tries a synchronous put into the queue. If a consumer encounters an exception condition whilst processing the
+ * data that is put, then this is returned to the caller wrapped inside a {@link SynchException}.
+ *
+ * @param e The data element to put into the queue.
+ *
+ * @throws InterruptedException If the thread is interrupted whilst waiting to write to the queue or whilst waiting
+ * on its entry in the queue being consumed.
+ * @throws SynchException If a consumer encounters an error whilst processing the data element.
+ */
+ public void tryPut(E e) throws InterruptedException, SynchException;
+
+ /**
+ * Takes all available data items from the queue or blocks until some become available. The returned items
+ * are wrapped in a {@link SynchRecord} which provides an interface to requeue them or send errors to their
+ * producers, where the producers are still blocked.
+ *
+ * @param c The collection to drain the data items into.
+ * @param unblock If set to <tt>true</tt> the producers for the taken items will be immediately unblocked.
+ *
+ * @return A count of the number of elements that were drained from the queue.
+ */
+ public SynchRef drainTo(Collection<SynchRecord<E>> c, boolean unblock);
+
+ /**
+ * Takes up to maxElements available data items from the queue or blocks until some become available. The returned
+ * items are wrapped in a {@link SynchRecord} which provides an interface to requeue them or send errors to their
+ * producers, where the producers are still blocked.
+ *
+ * @param c The collection to drain the data items into.
+ * @param maxElements The maximum number of elements to drain.
+ * @param unblock If set to <tt>true</tt> the producers for the taken items will be immediately unblocked.
+ *
+ * @return A count of the number of elements that were drained from the queue.
+ */
+ public SynchRef drainTo(Collection<SynchRecord<E>> c, int maxElements, boolean unblock);
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java
new file mode 100644
index 0000000000..4564b1d686
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java
@@ -0,0 +1,834 @@
+package org.apache.qpid.util.concurrent;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Synchronous/Asynchronous puts. Asynchronous is easiest, just wait till can write to queue and deposit data.
+ * Synchronous is harder. Deposit data, but then must wait until deposited element/elements are taken before being
+ * allowed to unblock and continue. Consumer needs some options here too. Can just get the data from the buffer and
+ * allow any producers unblocked as a result to continue, or can get data but continue blocking while the data is
+ * processed before sending a message to do the unblocking. Synch/Asynch mode to be controlled by a switch.
+ * Unblocking/not unblocking during consumer processing to be controlled by the consumers calls.
+ *
+ * <p/>Implementing sub-classes only need to supply an implementation of a queue to produce a valid concrete
+ * implementation of this. This queue is only accessed through the methods {@link #insert}, {@link #extract},
+ * {@link #getBufferCapacity()}, {@link #peekAtBufferHead()}. An implementation can override these methods to implement
+ * the buffer other than by a queue, for example, by using an array.
+ *
+ * <p/>Normal queue methods to work asynchronously.
+ * <p/>Put, take and drain methods from the BlockingQueue interface work synchronously but unblock producers immediately
+ * when their data is taken.
+ * <p/>The additional put, take and drain methods from the BatchSynchQueue interface work synchronously and provide the
+ * option to keep producers blocked until the consumer decides to release them.
+ *
+ * <p/>Removed take method that keeps producers blocked as it is pointless. Essentially it reduces this class to
+ * synchronous processing of individual data items, which negates the point of the hand-off design. The efficiency
+ * gain of the hand off design comes in being able to batch consume requests, ammortizing latency (such as caused by io)
+ * accross many producers. The only advantage of the single blocking take method is that it did take advantage of the
+ * queue ordering, which ma be usefull, for example to apply a priority ordering amongst producers. This is also an
+ * advantage over the java.util.concurrent.SynchronousQueue which doesn't have a backing queue which can be used to
+ * apply orderings. If a single item take is really needed can just use the drainTo method with a maximum of one item.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * </table>
+ */
+public abstract class BatchSynchQueueBase<E> extends AbstractQueue<E> implements BatchSynchQueue<E>
+{
+ /** Used for logging. */
+ private static final Logger log = LoggerFactory.getLogger(BatchSynchQueueBase.class);
+
+ /** Holds a reference to the queue implementation that holds the buffer. */
+ Queue<SynchRecordImpl<E>> buffer;
+
+ /** Holds the number of items in the queue */
+ private int count;
+
+ /** Main lock guarding all access */
+ private ReentrantLock lock;
+
+ /** Condition for waiting takes */
+ private Condition notEmpty;
+
+ /** Condition for waiting puts */
+ private Condition notFull;
+
+ /**
+ * Creates a batch synch queue without fair thread scheduling.
+ */
+ public BatchSynchQueueBase()
+ {
+ this(false);
+ }
+
+ /**
+ * Ensures that the underlying buffer implementation is created.
+ *
+ * @param fair <tt>true</tt> if fairness is to be applied to threads waiting to access the buffer.
+ */
+ public BatchSynchQueueBase(boolean fair)
+ {
+ buffer = this.createQueue();
+
+ // Create the buffer lock with the fairness flag set accordingly.
+ lock = new ReentrantLock(fair);
+
+ // Create the non-empty and non-full condition monitors on the buffer lock.
+ notEmpty = lock.newCondition();
+ notFull = lock.newCondition();
+ }
+
+ /**
+ * Returns an iterator over the elements contained in this collection.
+ *
+ * @return An iterator over the elements contained in this collection.
+ */
+ public Iterator<E> iterator()
+ {
+ throw new RuntimeException("Not implemented.");
+ }
+
+ /**
+ * Returns the number of elements in this collection. If the collection contains more than
+ * <tt>Integer.MAX_VALUE</tt> elements, returns <tt>Integer.MAX_VALUE</tt>.
+ *
+ * @return The number of elements in this collection.
+ */
+ public int size()
+ {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+
+ try
+ {
+ return count;
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Inserts the specified element into this queue, if possible. When using queues that may impose insertion
+ * restrictions (for example capacity bounds), method <tt>offer</tt> is generally preferable to method
+ * {@link java.util.Collection#add}, which can fail to insert an element only by throwing an exception.
+ *
+ * @param e The element to insert.
+ *
+ * @return <tt>true</tt> if it was possible to add the element to this queue, else <tt>false</tt>
+ */
+ public boolean offer(E e)
+ {
+ if (e == null)
+ {
+ throw new NullPointerException();
+ }
+
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+
+ try
+ {
+ return insert(e, false);
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Inserts the specified element into this queue, waiting if necessary up to the specified wait time for space to
+ * become available.
+ *
+ * @param e The element to add.
+ * @param timeout How long to wait before giving up, in units of <tt>unit</tt>
+ * @param unit A <tt>TimeUnit</tt> determining how to interpret the <tt>timeout</tt> parameter.
+ *
+ * @return <tt>true</tt> if successful, or <tt>false</tt> if the specified waiting time elapses before space is
+ * available.
+ *
+ * @throws InterruptedException If interrupted while waiting.
+ * @throws NullPointerException If the specified element is <tt>null</tt>.
+ */
+ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException
+ {
+ if (e == null)
+ {
+ throw new NullPointerException();
+ }
+
+ final ReentrantLock lock = this.lock;
+ lock.lockInterruptibly();
+
+ long nanos = unit.toNanos(timeout);
+
+ try
+ {
+ do
+ {
+ if (insert(e, false))
+ {
+ return true;
+ }
+
+ try
+ {
+ nanos = notFull.awaitNanos(nanos);
+ }
+ catch (InterruptedException ie)
+ {
+ notFull.signal(); // propagate to non-interrupted thread
+ throw ie;
+ }
+ }
+ while (nanos > 0);
+
+ return false;
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Retrieves and removes the head of this queue, or <tt>null</tt> if this queue is empty.
+ *
+ * @return The head of this queue, or <tt>null</tt> if this queue is empty.
+ */
+ public E poll()
+ {
+ final ReentrantLock lock = this.lock;
+
+ lock.lock();
+ try
+ {
+ if (count == 0)
+ {
+ return null;
+ }
+
+ E x = extract(true, true).getElement();
+
+ return x;
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Retrieves and removes the head of this queue, waiting if necessary up to the specified wait time if no elements
+ * are present on this queue.
+ *
+ * @param timeout How long to wait before giving up, in units of <tt>unit</tt>.
+ * @param unit A <tt>TimeUnit</tt> determining how to interpret the <tt>timeout</tt> parameter.
+ *
+ * @return The head of this queue, or <tt>null</tt> if the specified waiting time elapses before an element is present.
+ *
+ * @throws InterruptedException If interrupted while waiting.
+ */
+ public E poll(long timeout, TimeUnit unit) throws InterruptedException
+ {
+ final ReentrantLock lock = this.lock;
+ lock.lockInterruptibly();
+ try
+ {
+ long nanos = unit.toNanos(timeout);
+
+ do
+ {
+ if (count != 0)
+ {
+ E x = extract(true, true).getElement();
+
+ return x;
+ }
+
+ try
+ {
+ nanos = notEmpty.awaitNanos(nanos);
+ }
+ catch (InterruptedException ie)
+ {
+ notEmpty.signal(); // propagate to non-interrupted thread
+ throw ie;
+ }
+ }
+ while (nanos > 0);
+
+ return null;
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Retrieves, but does not remove, the head of this queue, returning <tt>null</tt> if this queue is empty.
+ *
+ * @return The head of this queue, or <tt>null</tt> if this queue is empty.
+ */
+ public E peek()
+ {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+
+ try
+ {
+ return peekAtBufferHead();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns the number of elements that this queue can ideally (in the absence of memory or resource constraints)
+ * accept without blocking, or <tt>Integer.MAX_VALUE</tt> if there is no intrinsic limit.
+ *
+ * <p>Note that you <em>cannot</em> always tell if an attempt to <tt>add</tt> an element will succeed by
+ * inspecting <tt>remainingCapacity</tt> because it may be the case that another thread is about to <tt>put</tt>
+ * or <tt>take</tt> an element.
+ *
+ * @return The remaining capacity.
+ */
+ public int remainingCapacity()
+ {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+
+ try
+ {
+ return getBufferCapacity() - count;
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Adds the specified element to this queue, waiting if necessary for space to become available.
+ *
+ * <p/>This method delegated to {@link #tryPut} which can raise {@link SynchException}s. If any are raised
+ * this method silently ignores them. Use the {@link #tryPut} method directly if you want to catch these
+ * exceptions.
+ *
+ * @param e The element to add.
+ *
+ * @throws InterruptedException If interrupted while waiting.
+ */
+ public void put(E e) throws InterruptedException
+ {
+ try
+ {
+ tryPut(e);
+ }
+ catch (SynchException ex)
+ {
+ // This exception is deliberately ignored. See the method comment for information about this.
+ }
+ }
+
+ /**
+ * Tries a synchronous put into the queue. If a consumer encounters an exception condition whilst processing the
+ * data that is put, then this is returned to the caller wrapped inside a {@link SynchException}.
+ *
+ * @param e The data element to put into the queue. Cannot be null.
+ *
+ * @throws InterruptedException If the thread is interrupted whilst waiting to write to the queue or whilst waiting
+ * on its entry in the queue being consumed.
+ * @throws SynchException If a consumer encounters an error whilst processing the data element.
+ */
+ public void tryPut(E e) throws InterruptedException, SynchException
+ {
+ if (e == null)
+ {
+ throw new NullPointerException();
+ }
+
+ // final Queue<E> items = this.buffer;
+ final ReentrantLock lock = this.lock;
+ lock.lockInterruptibly();
+
+ try
+ {
+ while (count == getBufferCapacity())
+ {
+ // Release the lock and wait until the queue is not full.
+ notFull.await();
+ }
+ }
+ catch (InterruptedException ie)
+ {
+ notFull.signal(); // propagate to non-interrupted thread
+ throw ie;
+ }
+
+ // There is room in the queue so insert must succeed. Insert into the queu, release the lock and block
+ // the producer until its data is taken.
+ insert(e, true);
+ }
+
+ /**
+ * Retrieves and removes the head of this queue, waiting if no elements are present on this queue.
+ * Any producer that has its data element taken by this call will be immediately unblocked. To keep the
+ * producer blocked whilst taking just a single item, use the
+ * {@link #drainTo(java.util.Collection<org.apache.qpid.util.concurrent.SynchRecord<E>>, int, boolean)}
+ * method. There is no take method to do that because there is not usually any advantage in a synchronous hand
+ * off design that consumes data one item at a time. It is normal to consume data in chunks to ammortize consumption
+ * latencies accross many producers where possible.
+ *
+ * @return The head of this queue.
+ *
+ * @throws InterruptedException if interrupted while waiting.
+ */
+ public E take() throws InterruptedException
+ {
+ final ReentrantLock lock = this.lock;
+ lock.lockInterruptibly();
+
+ try
+ {
+ try
+ {
+ while (count == 0)
+ {
+ // Release the lock and wait until the queue becomes non-empty.
+ notEmpty.await();
+ }
+ }
+ catch (InterruptedException ie)
+ {
+ notEmpty.signal(); // propagate to non-interrupted thread
+ throw ie;
+ }
+
+ // There is data in the queue so extraction must succeed. Notify any waiting threads that the queue is
+ // not full, and unblock the producer that owns the data item that is taken.
+ E x = extract(true, true).getElement();
+
+ return x;
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Removes all available elements from this queue and adds them into the given collection. This operation may be
+ * more efficient than repeatedly polling this queue. A failure encountered while attempting to <tt>add</tt> elements
+ * to collection <tt>c</tt> may result in elements being in neither, either or both collections when the associated
+ * exception is thrown. Attempts to drain a queue to itself result in <tt>IllegalArgumentException</tt>. Further,
+ * the behavior of this operation is undefined if the specified collection is modified while the operation is in
+ * progress.
+ *
+ * @param objects The collection to transfer elements into.
+ *
+ * @return The number of elements transferred.
+ *
+ * @throws NullPointerException If objects is null.
+ * @throws IllegalArgumentException If objects is this queue.
+ */
+ public int drainTo(Collection<? super E> objects)
+ {
+ return drainTo(objects, -1);
+ }
+
+ /**
+ * Removes at most the given number of available elements from this queue and adds them into the given collection.
+ * A failure encountered while attempting to <tt>add</tt> elements to collection <tt>c</tt> may result in elements
+ * being in neither, either or both collections when the associated exception is thrown. Attempts to drain a queue
+ * to itself result in <tt>IllegalArgumentException</tt>. Further, the behavior of this operation is undefined if
+ * the specified collection is modified while the operation is in progress.
+ *
+ * @param objects The collection to transfer elements into.
+ * @param maxElements The maximum number of elements to transfer. If this is -1 then that is interpreted as meaning
+ * all elements.
+ *
+ * @return The number of elements transferred.
+ *
+ * @throws NullPointerException If c is null.
+ * @throws IllegalArgumentException If c is this queue.
+ */
+ public int drainTo(Collection<? super E> objects, int maxElements)
+ {
+ if (objects == null)
+ {
+ throw new NullPointerException();
+ }
+
+ if (objects == this)
+ {
+ throw new IllegalArgumentException();
+ }
+
+ // final Queue<E> items = this.buffer;
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+
+ try
+ {
+ int n = 0;
+
+ for (int max = ((maxElements >= count) || (maxElements < 0)) ? count : maxElements; n < max; n++)
+ {
+ // Take items from the queue, do unblock the producers, but don't send not full signals yet.
+ objects.add(extract(true, false).getElement());
+ }
+
+ if (n > 0)
+ {
+ // count -= n;
+ notFull.signalAll();
+ }
+
+ return n;
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Takes all available data items from the queue or blocks until some become available. The returned items
+ * are wrapped in a {@link SynchRecord} which provides an interface to requeue them or send errors to their
+ * producers, where the producers are still blocked.
+ *
+ * @param c The collection to drain the data items into.
+ * @param unblock If set to <tt>true</tt> the producers for the taken items will be immediately unblocked.
+ *
+ * @return A count of the number of elements that were drained from the queue.
+ */
+ public SynchRef drainTo(Collection<SynchRecord<E>> c, boolean unblock)
+ {
+ return drainTo(c, -1, unblock);
+ }
+
+ /**
+ * Takes up to maxElements available data items from the queue or blocks until some become available. The returned
+ * items are wrapped in a {@link SynchRecord} which provides an interface to requeue them or send errors to their
+ * producers, where the producers are still blocked.
+ *
+ * @param coll The collection to drain the data items into.
+ * @param maxElements The maximum number of elements to drain.
+ * @param unblock If set to <tt>true</tt> the producers for the taken items will be immediately unblocked.
+ *
+ * @return A count of the number of elements that were drained from the queue.
+ */
+ public SynchRef drainTo(Collection<SynchRecord<E>> coll, int maxElements, boolean unblock)
+ {
+ if (coll == null)
+ {
+ throw new NullPointerException();
+ }
+
+ // final Queue<E> items = this.buffer;
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+
+ try
+ {
+ int n = 0;
+
+ for (int max = ((maxElements >= count) || (maxElements < 0)) ? count : maxElements; n < max; n++)
+ {
+ // Extract the next record from the queue, don't signall the not full condition yet and release
+ // producers depending on whether the caller wants to or not.
+ coll.add(extract(false, unblock));
+ }
+
+ if (n > 0)
+ {
+ // count -= n;
+ notFull.signalAll();
+ }
+
+ return new SynchRefImpl(n, coll);
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * This abstract method should be overriden to return an empty queue. Different implementations of producer
+ * consumer buffers can control the order in which data is accessed using different queue implementations.
+ * This method allows the type of queue to be abstracted out of this class and to be supplied by concrete
+ * implementations.
+ *
+ * @return An empty queue.
+ */
+ protected abstract <T> Queue<T> createQueue();
+
+ /**
+ * Insert element into the queue, then possibly signal that the queue is not empty and block the producer
+ * on the element until permission to procede is given.
+ *
+ * <p/>If the producer is to be blocked then the lock must be released first, otherwise no other process
+ * will be able to get access to the queue. Hence, unlock and block are always set together.
+ *
+ * <p/>Call only when holding the global lock.
+ *
+ * @param unlockAndBlock <tt>true</tt>If the global queue lock should be released and the producer should be blocked.
+ *
+ * @return <tt>true</tt> if the operation succeeded, <tt>false</tt> otherwise. If the result is <tt>true</tt> this
+ * method may not return straight away, but only after the producer is unblocked by having its data
+ * consumed if the unlockAndBlock flag is set. In the false case the method will return straight away, no
+ * matter what value the unlockAndBlock flag has, leaving the global lock on.
+ */
+ protected boolean insert(E x, boolean unlockAndBlock)
+ {
+ // Create a new record for the data item.
+ SynchRecordImpl<E> record = new SynchRecordImpl<E>(x);
+
+ boolean result = buffer.offer(record);
+
+ if (result)
+ {
+ count++;
+
+ // Tell any waiting consumers that the queue is not empty.
+ notEmpty.signal();
+
+ if (unlockAndBlock)
+ {
+ // Allow other threads to read/write the queue.
+ lock.unlock();
+
+ // Wait until a consumer takes this data item.
+ record.waitForConsumer();
+ }
+
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ /**
+ * Extract element at current take position, advance, and signal.
+ *
+ * <p/>Call only when holding lock.
+ */
+ protected SynchRecordImpl<E> extract(boolean unblock, boolean signal)
+ {
+ SynchRecordImpl<E> result = buffer.remove();
+ count--;
+
+ if (signal)
+ {
+ notFull.signal();
+ }
+
+ if (unblock)
+ {
+ result.releaseImmediately();
+ }
+
+ return result;
+ }
+
+ /**
+ * Get the capacity of the buffer. If the buffer has no maximum capacity then Integer.MAX_VALUE is returned.
+ *
+ * <p/>Call only when holding lock.
+ *
+ * @return The maximum capacity of the buffer.
+ */
+ protected int getBufferCapacity()
+ {
+ if (buffer instanceof Capacity)
+ {
+ return ((Capacity) buffer).getCapacity();
+ }
+ else
+ {
+ return Integer.MAX_VALUE;
+ }
+ }
+
+ /**
+ * Return the head element from the buffer.
+ *
+ * <p/>Call only when holding lock.
+ *
+ * @return The head element from the buffer.
+ */
+ protected E peekAtBufferHead()
+ {
+ return buffer.peek().getElement();
+ }
+
+ public class SynchRefImpl implements SynchRef
+ {
+ /** Holds the number of synch records associated with this reference. */
+ int numRecords;
+
+ /** Holds a reference to the collection of synch records managed by this. */
+ Collection<SynchRecord<E>> records;
+
+ public SynchRefImpl(int n, Collection<SynchRecord<E>> records)
+ {
+ this.numRecords = n;
+ this.records = records;
+ }
+
+ public int getNumRecords()
+ {
+ return numRecords;
+ }
+
+ /**
+ * Any producers that have had their data elements taken from the queue but have not been unblocked are unblocked
+ * when this method is called. The exception to this is producers that have had their data put back onto the queue
+ * by a consumer. Producers that have had exceptions for their data items registered by consumers will be unblocked
+ * but will not return from their put call normally, but with an exception instead.
+ */
+ public void unblockProducers()
+ {
+ log.debug("public void unblockProducers(): called");
+
+ if (records != null)
+ {
+ for (SynchRecord<E> record : records)
+ {
+ // This call takes account of items that have already been released, are to be requeued or are in
+ // error.
+ record.releaseImmediately();
+ }
+ }
+
+ records = null;
+ }
+ }
+
+ /**
+ * A SynchRecordImpl is used by a {@link BatchSynchQueue} to pair together a producer with its data. This allows
+ * the producer of data to be identified so that it can be unblocked when its data is consumed or sent errors when
+ * its data cannot be consumed.
+ */
+ public class SynchRecordImpl<E> implements SynchRecord<E>
+ {
+ /** A boolean latch that determines when the producer for this data item will be allowed to continue. */
+ BooleanLatch latch = new BooleanLatch();
+
+ /** The data element associated with this item. */
+ E element;
+
+ /**
+ * Create a new synch record.
+ *
+ * @param e The data element that the record encapsulates.
+ */
+ public SynchRecordImpl(E e)
+ {
+ // Keep the data element.
+ element = e;
+ }
+
+ /**
+ * Waits until the producer is given permission to proceded by a consumer.
+ */
+ public void waitForConsumer()
+ {
+ latch.await();
+ }
+
+ /**
+ * Gets the data element contained by this record.
+ *
+ * @return The data element contained by this record.
+ */
+ public E getElement()
+ {
+ return element;
+ }
+
+ /**
+ * Immediately releases the producer of this data record. Consumers can bring the synchronization time of
+ * producers to a minimum by using this method to release them at the earliest possible moment when batch
+ * consuming records from sychronized producers.
+ */
+ public void releaseImmediately()
+ {
+ // Check that the record has not already been released, is in error or is to be requeued.
+ latch.signal();
+
+ // Propagate errors to the producer.
+
+ // Requeue items to be requeued.
+ }
+
+ /**
+ * Tells the synch queue to put this element back onto the queue instead of releasing its producer.
+ * The element is not requeued immediately but upon calling the {@link SynchRef#unblockProducers()} method or
+ * the {@link #releaseImmediately()} method.
+ *
+ * <p/>This method will raise a runtime exception {@link AlreadyUnblockedException} if the producer for this
+ * element has already been unblocked.
+ */
+ public void reQueue()
+ {
+ throw new RuntimeException("Not implemented.");
+ }
+
+ /**
+ * Tells the synch queue to raise an exception with this elements producer. The exception is not raised
+ * immediately but upon calling the {@link SynchRef#unblockProducers()} method or the
+ * {@link #releaseImmediately()} method. The exception will be wrapped in a {@link SynchException} before it is
+ * raised on the producer.
+ *
+ * <p/>This method is unusual in that it accepts an exception as an argument. This is non-standard but is used
+ * because the exception is to be passed onto a different thread.
+ *
+ * <p/>This method will raise a runtime exception {@link AlreadyUnblockedException} if the producer for this
+ * element has already been unblocked.
+ *
+ * @param e The exception to raise on the producer.
+ */
+ public void inError(Exception e)
+ {
+ throw new RuntimeException("Not implemented.");
+ }
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BooleanLatch.java b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BooleanLatch.java
new file mode 100644
index 0000000000..0e4a07594f
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/BooleanLatch.java
@@ -0,0 +1,128 @@
+package org.apache.qpid.util.concurrent;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+
+/**
+ * A BooleanLatch is like a set of traffic lights, where threads can wait at a red light until another thread gives
+ * the green light. When threads arrive at the latch it is initially red. They queue up until the green signal is
+ * given, at which point they can all acquire the latch in shared mode and continue to run concurrently. Once the latch
+ * is signalled it cannot be reset to red again.
+ *
+ * <p/> The latch uses a {@link java.util.concurrent.locks.AbstractQueuedSynchronizer} to implement its synchronization.
+ * This has two internal states, 0 which means that the latch is blocked, and 1 which means that the latch is open.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Block threads until a go signal is given.
+ * </table>
+ *
+ * @todo Might be better to use a countdown latch to count down from 1. Its await method can throw interrupted
+ * exception which makes the possibility of interruption more explicit, and provides a reminder to recheck the
+ * latch condition before continuing.
+ */
+public class BooleanLatch
+{
+ /** Holds the synchronizer that provides the thread queueing synchronization. */
+ private final Sync sync = new Sync();
+
+ /**
+ * Tests whether or not the latch has been signalled, that is to say that, the light is green.
+ *
+ * <p/>This method is non-blocking.
+ *
+ * @return <tt>true</tt> if the latch may be acquired; the light is green.
+ */
+ public boolean isSignalled()
+ {
+ return sync.isSignalled();
+ }
+
+ /**
+ * Waits on the latch until the signal is given and the light is green. If the light is already green then the
+ * latch will be acquired and the thread will not have to wait.
+ *
+ * <p/>This method will block until the go signal is given or the thread is otherwise interrupted. Before carrying
+ * out any processing threads that return from this method should confirm that the go signal has really been given
+ * on this latch by calling the {@link #isSignalled()} method.
+ */
+ public void await()
+ {
+ sync.acquireShared(1);
+ }
+
+ /**
+ * Releases any threads currently waiting on the latch. This flips the light to green allowing any threads that
+ * were waiting for this condition to now run.
+ *
+ * <p/>This method is non-blocking.
+ */
+ public void signal()
+ {
+ sync.releaseShared(1);
+ }
+
+ /**
+ * Implements a thread queued synchronizer. The internal state 0 means that the queue is blocked and the internl
+ * state 1 means that the queue is released and that all waiting threads can acquire the synchronizer in shared
+ * mode.
+ */
+ private static class Sync extends AbstractQueuedSynchronizer
+ {
+ /**
+ * Attempts to acquire this synchronizer in shared mode. It may be acquired once it has been released.
+ *
+ * @param ignore This parameter is ignored.
+ *
+ * @return 1 if the shared acquisition succeeds and -1 if it fails.
+ */
+ protected int tryAcquireShared(int ignore)
+ {
+ return isSignalled() ? 1 : -1;
+ }
+
+ /**
+ * Releases the synchronizer, setting its internal state to 1.
+ *
+ * @param ignore This parameter is ignored.
+ *
+ * @return <tt>true</tt> always.
+ */
+ protected boolean tryReleaseShared(int ignore)
+ {
+ setState(1);
+
+ return true;
+ }
+
+ /**
+ * Tests if the synchronizer is signalled. It is signalled when its internal state it 1.
+ *
+ * @return <tt>true</tt> if the internal state is 1, <tt>false</tt> otherwise.
+ */
+ boolean isSignalled()
+ {
+ return getState() != 0;
+ }
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/Capacity.java b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/Capacity.java
new file mode 100644
index 0000000000..a97ce0e172
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/Capacity.java
@@ -0,0 +1,35 @@
+package org.apache.qpid.util.concurrent;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+/**
+ * An interface exposed by data structures that have a maximum capacity.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Report the maximum capacity.
+ * </table>
+ */
+public interface Capacity
+{
+ public int getCapacity();
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchBuffer.java b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchBuffer.java
new file mode 100644
index 0000000000..bc63eb0353
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchBuffer.java
@@ -0,0 +1,50 @@
+package org.apache.qpid.util.concurrent;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.util.Queue;
+
+/**
+ * SynchBuffer completes the {@link BatchSynchQueueBase} abstract class by providing an implementation of the underlying
+ * queue as an array. This uses FIFO ordering for the queue but restricts the maximum size of the queue to a fixed
+ * amount. It also has the advantage that, as the buffer does not grow and shrink dynamically, memory for the buffer
+ * is allocated up front and does not create garbage during the operation of the queue.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Provide array based FIFO queue to create a batch synched queue around.
+ * </table>
+ *
+ * @todo Write an array based buffer implementation that implements Queue.
+ */
+public class SynchBuffer<E> extends BatchSynchQueueBase<E>
+{
+ /**
+ * Returns an empty queue, implemented as an array.
+ *
+ * @return An empty queue, implemented as an array.
+ */
+ protected <T> Queue<T> createQueue()
+ {
+ throw new RuntimeException("Not implemented.");
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchException.java b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchException.java
new file mode 100644
index 0000000000..99a83f96cd
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchException.java
@@ -0,0 +1,52 @@
+package org.apache.qpid.util.concurrent;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+/**
+ * SynchException is used to encapsulate exceptions with the data elements that caused them in order to send exceptions
+ * back from the consumers of a {@link BatchSynchQueue} to producers. The underlying exception should be retrieved from
+ * the {@link #getCause} method.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Encapsulate a data element and exception.
+ * </table>
+ */
+public class SynchException extends Exception
+{
+ /** Holds the data element that is in error. */
+ Object element;
+
+ /**
+ * Creates a new BaseApplicationException object.
+ *
+ * @param message The exception message.
+ * @param cause The underlying throwable cause. This may be null.
+ */
+ public SynchException(String message, Throwable cause, Object element)
+ {
+ super(message, cause);
+
+ // Keep the data element that was in error.
+ this.element = element;
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchQueue.java b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchQueue.java
new file mode 100644
index 0000000000..95833f398a
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchQueue.java
@@ -0,0 +1,48 @@
+package org.apache.qpid.util.concurrent;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+/**
+ * SynchQueue completes the {@link BatchSynchQueueBase} abstract class by providing an implementation of the underlying
+ * queue as a linked list. This uses FIFO ordering for the queue and allows the queue to grow to accomodate more
+ * elements as needed.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Provide linked list FIFO queue to create a batch synched queue around.
+ * </table>
+ */
+public class SynchQueue<E> extends BatchSynchQueueBase<E>
+{
+ /**
+ * Returns an empty queue, implemented as a linked list.
+ *
+ * @return An empty queue, implemented as a linked list.
+ */
+ protected <T> Queue<T> createQueue()
+ {
+ return new LinkedList<T>();
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRecord.java b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRecord.java
new file mode 100644
index 0000000000..fd740c20cd
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRecord.java
@@ -0,0 +1,74 @@
+package org.apache.qpid.util.concurrent;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+/**
+ * SynchRecord associates a data item from a {@link BatchSynchQueue} with its producer. This enables the data item data
+ * item to be put back on the queue without unblocking its producer, or to send exceptions to the producer.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Get the underlying data element.
+ * <tr><td> Put the data element back on the queue without unblocking its producer.
+ * <tr><td> Send and exception to the data elements producer.
+ * </table>
+ */
+public interface SynchRecord<E>
+{
+ /**
+ * Gets the data element contained by this record.
+ *
+ * @return The data element contained by this record.
+ */
+ public E getElement();
+
+ /**
+ * Tells the synch queue to put this element back onto the queue instead of releasing its producer.
+ * The element is not requeued immediately but upon calling the {@link SynchRef#unblockProducers()} method.
+ *
+ * <p/>This method will raise a runtime exception {@link AlreadyUnblockedException} if the producer for this element
+ * has already been unblocked.
+ */
+ public void reQueue();
+
+ /**
+ * Immediately releases the producer of this data record. Consumers can bring the synchronization time of
+ * producers to a minimum by using this method to release them at the earliest possible moment when batch
+ * consuming records from sychronized producers.
+ */
+ public void releaseImmediately();
+
+ /**
+ * Tells the synch queue to raise an exception with this elements producer. The exception is not raised immediately
+ * but upon calling the {@link SynchRef#unblockProducers()} method. The exception will be wrapped in a
+ * {@link SynchException} before it is raised on the producer.
+ *
+ * <p/>This method is unusual in that it accepts an exception as an argument. This is non-standard but is used
+ * because the exception is to be passed onto a different thread.
+ *
+ * <p/>This method will raise a runtime exception {@link AlreadyUnblockedException} if the producer for this element
+ * has already been unblocked.
+ *
+ * @param e The exception to raise on the producer.
+ */
+ public void inError(Exception e);
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRef.java b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRef.java
new file mode 100644
index 0000000000..efe2344c06
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRef.java
@@ -0,0 +1,51 @@
+package org.apache.qpid.util.concurrent;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+/**
+ * A SynchRef is an interface which is returned from the synchronous take and drain methods of {@link BatchSynchQueue},
+ * allowing call-backs to be made against the synchronizing strucutre. It allows the consumer to communicate when it
+ * wants producers that have their data taken to be unblocked.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities
+ * <tr><td> Report number of records returned by a taking operation.
+ * <tr><td> Provide call-back to release producers of taken records.
+ * </table>
+ */
+public interface SynchRef
+{
+ /**
+ * Reports the number of records taken by the take or drain operation.
+ *
+ * @return The number of records taken by the take or drain operation.
+ */
+ public int getNumRecords();
+
+ /**
+ * Any producers that have had their data elements taken from the queue but have not been unblocked are
+ * unblocked when this method is called. The exception to this is producers that have had their data put back
+ * onto the queue by a consumer. Producers that have had exceptions for their data items registered by consumers
+ * will be unblocked but will not return from their put call normally, but with an exception instead.
+ */
+ public void unblockProducers();
+}