diff options
13 files changed, 728 insertions, 185 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java b/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java index d8aa9bf5ca..9ed915cc35 100644 --- a/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java +++ b/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java @@ -14,27 +14,46 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. + * under the License. + * * - * */ package org.apache.qpid.common; import org.apache.qpid.framing.AMQShortString; +/** + * Specifies the different filter types for consumers that filter their messages. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Represent different consumer filter types. + * </table> + */ public enum AMQPFilterTypes { JMS_SELECTOR("x-filter-jms-selector"), NO_CONSUME("x-filter-no-consume"), AUTO_CLOSE("x-filter-auto-close"); + /** The identifying string for the filter type. */ private final AMQShortString _value; + /** + * Creates a new filter type from its identifying string. + * + * @param value The identifying string. + */ AMQPFilterTypes(String value) { _value = new AMQShortString(value); } + /** + * Gets the identifying string of the filter type. + * + * @return The identifying string of the filter type. + */ public AMQShortString getValue() { return _value; diff --git a/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java b/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java index 07371b5182..67f16e6a87 100644 --- a/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java +++ b/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java @@ -14,12 +14,20 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. + * under the License. + * * - * */ package org.apache.qpid.common; +/** + * Specifies the available client property types that different clients can use to identify themselves with. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Specify the available client property types. + * </table> + */ public enum ClientProperties { instance, diff --git a/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java b/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java index 80377ffdf5..5a357557ca 100644 --- a/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java +++ b/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java @@ -14,9 +14,9 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. + * under the License. + * * - * */ package org.apache.qpid.common; @@ -27,23 +27,56 @@ import java.util.Properties; import org.apache.log4j.Logger; +/** + * QpidProperties captures the project name, version number, and source code repository revision number from a properties + * file which is generated as part of the build process. Normally, the name and version number are pulled from the module + * name and version number of the Maven build POM, but could come from other sources if the build system is changed. The + * idea behind this, is that every build has these values incorporated directly into its jar file, so that code in the + * wild can be identified, should its origination be forgotten. + * + * <p/>To get the build version of any Qpid code call the {@link #main} method. This version string is usually also + * printed to the console on broker start up. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><td>Load build versioning information into the runtime, for code identification purposes. + * </table> + * + * @todo Code to locate/load/log properties can be factored into a reusable properties utils class. Avoid having this + * same snippet of loading code scattered in many places. + * + * @todo Could also add a build number property for a sequential build number assigned by an automated build system, for + * build reproducability purposes. + */ public class QpidProperties { + /** Used for debugging purposes. */ private static final Logger _logger = Logger.getLogger(QpidProperties.class); - + + /** The name of the version properties file to load from the class path. */ public static final String VERSION_RESOURCE = "qpidversion.properties"; + /** Defines the name of the product property. */ public static final String PRODUCT_NAME_PROPERTY = "qpid.name"; + + /** Defines the name of the version property. */ public static final String RELEASE_VERSION_PROPERTY = "qpid.version"; + + /** Defines the name of the source code revision property. */ public static final String BUILD_VERSION_PROPERTY = "qpid.svnversion"; + /** Defines the default value for all properties that cannot be loaded. */ private static final String DEFAULT = "unknown"; + /** Holds the product name. */ private static String productName = DEFAULT; + + /** Holds the product version. */ private static String releaseVersion = DEFAULT; + + /** Holds the source code revision. */ private static String buildVersion = DEFAULT; - /** Loads the values from the version properties file. */ + // Loads the values from the version properties file. static { Properties props = new Properties(); @@ -62,16 +95,17 @@ public class QpidProperties if (_logger.isDebugEnabled()) { _logger.debug("Dumping QpidProperties"); - for (Map.Entry<Object,Object> entry : props.entrySet()) + for (Map.Entry<Object, Object> entry : props.entrySet()) { - _logger.debug("Property: " + entry.getKey() + " Value: "+ entry.getValue()); + _logger.debug("Property: " + entry.getKey() + " Value: " + entry.getValue()); } + _logger.debug("End of property dump"); } productName = readPropertyValue(props, PRODUCT_NAME_PROPERTY); releaseVersion = readPropertyValue(props, RELEASE_VERSION_PROPERTY); - buildVersion = readPropertyValue(props, BUILD_VERSION_PROPERTY); + buildVersion = readPropertyValue(props, BUILD_VERSION_PROPERTY); } } catch (IOException e) @@ -81,26 +115,56 @@ public class QpidProperties } } + /** + * Gets the product name. + * + * @return The product name. + */ public static String getProductName() { return productName; } + /** + * Gets the product version. + * + * @return The product version. + */ public static String getReleaseVersion() { return releaseVersion; } + /** + * Gets the source code revision. + * + * @return The source code revision. + */ public static String getBuildVersion() { return buildVersion; } + /** + * Extracts all of the version information as a printable string. + * + * @return All of the version information as a printable string. + */ public static String getVersionString() { return getProductName() + " - " + getReleaseVersion() + " build: " + getBuildVersion(); } + /** + * Helper method to extract a named property from properties. + * + * @param props The properties. + * @param propertyName The named property to extract. + * + * @return The extracted property or a default value if the properties do not contain the named property. + * + * @todo A bit pointless. + */ private static String readPropertyValue(Properties props, String propertyName) { String retVal = (String) props.get(propertyName); @@ -108,9 +172,16 @@ public class QpidProperties { retVal = DEFAULT; } + return retVal; } + /** + * Prints the versioning information to the console. This is extremely usefull for identifying Qpid code in the + * wild, where the origination of the code has been forgotten. + * + * @param args Does not require any arguments. + */ public static void main(String[] args) { System.out.println(getVersionString()); diff --git a/java/common/src/main/java/org/apache/qpid/configuration/Configured.java b/java/common/src/main/java/org/apache/qpid/configuration/Configured.java index 7d2e7d3a5d..22903888fe 100644 --- a/java/common/src/main/java/org/apache/qpid/configuration/Configured.java +++ b/java/common/src/main/java/org/apache/qpid/configuration/Configured.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,7 +26,7 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** - * Marks a field as being "configured" externally. + * Marks a field as having a "configured" value injected into it by a configurator. */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.FIELD) diff --git a/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java b/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java index 022e7b8a76..76225778e3 100644 --- a/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java +++ b/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java @@ -24,7 +24,16 @@ import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; /** - * Indicates an error parsing a property expansion. + * Indicates a failure to parse a property expansion. See {@link PropertyUtils} for the code that does property + * expansions. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaboration + * <tr><td> Represent failure to expand a property name into a value. + * </table> + * + * @todo AMQException is to be reserved for protocol related conditions. This exception does not have a status code, so + * don't inherit from AMQException. */ public class PropertyException extends AMQException { @@ -33,6 +42,7 @@ public class PropertyException extends AMQException super(message); } + /* public PropertyException(String msg, Throwable t) { super(msg, t); @@ -47,4 +57,5 @@ public class PropertyException extends AMQException { super(errorCode, msg); } + */ } diff --git a/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java b/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java index 37d8af2501..b3c310d23c 100644 --- a/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java +++ b/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,24 +24,35 @@ import java.util.ArrayList; import java.util.Iterator; /** - * Based on code in Apache Ant, this utility class handles property expansion. This - * is most useful in config files and so on. + * PropertyUtils provides helper methods for dealing with Java properties. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Expand system properties into strings with named expansions. + * </table> + * + * @todo Make the lookup method generic by passing in the properties to use for the expansion, rather than hard coding + * as system properties. The expansion code has greater potential for re-use that way. + * + * @todo Some more property related code could be added to this utils class, which might more appropriately reside under + * org.apache.qpid.util. For example standardised code to load properties from a resource name, currently found in + * QpidProperties and possibly other places could be moved here. */ public class PropertyUtils { /** - * Replaces <code>${xxx}</code> style constructions in the given value - * with the string value of the corresponding data types. Replaces only system - * properties + * Given a string that contains substrings of the form <code>${xxx}</code>, looks up the valuea of 'xxx' as a + * system properties and substitutes tham back into the original string, to provide a property value expanded + * string. * - * @param value The string to be scanned for property references. - * May be <code>null</code>, in which case this + * @param value The string to be scanned for property references. May be <code>null</code>, in which case this * method returns immediately with no effect. - * @return the original string with the properties replaced, or - * <code>null</code> if the original string is <code>null</code>. - * @throws PropertyException if the string contains an opening - * <code>${</code> without a closing - * <code>}</code> + * + * @return The original string with the properties replaced, or <code>null</code> if the original string is + * <code>null</code>. + * + * @throws PropertyException If the string contains an opening <code>${</code> without a balancing <code>}</code>, + * or if the property to expand does not exist as a system property. */ public static String replaceProperties(String value) throws PropertyException { @@ -69,11 +80,12 @@ public class PropertyUtils if (replacement == null) { - throw new PropertyException("Property ${" + propertyName + - "} has not been set"); + throw new PropertyException("Property ${" + propertyName + "} has not been set"); } + fragment = replacement; } + sb.append(fragment); } @@ -81,32 +93,30 @@ public class PropertyUtils } /** - * Default parsing method. Parses the supplied value for properties which are specified - * using ${foo} syntax. $X is left as is, and $$ specifies a single $. - * @param value the property string to parse - * @param fragments is populated with the string fragments. A null means "insert a - * property value here. The number of nulls in the list when populated is equal to the - * size of the propertyRefs list - * @param propertyRefs populated with the property names to be added into the final - * String. + * Parses the supplied value for properties which are specified using ${foo} syntax. $X is left as is, and $$ + * specifies a single $. + * + * @param value The property string to parse. + * @param fragments Is populated with the string fragments. A null means "insert a property value here. The number + * of nulls in the list when populated is equal to the size of the propertyRefs list. + * @param propertyRefs Populated with the property names to be added into the final string. */ - private static void parsePropertyString(String value, ArrayList<String> fragments, - ArrayList<String> propertyRefs) - throws PropertyException + private static void parsePropertyString(String value, ArrayList<String> fragments, ArrayList<String> propertyRefs) + throws PropertyException { int prev = 0; int pos; - //search for the next instance of $ from the 'prev' position + // search for the next instance of $ from the 'prev' position while ((pos = value.indexOf("$", prev)) >= 0) { - //if there was any text before this, add it as a fragment + // if there was any text before this, add it as a fragment if (pos > 0) { fragments.add(value.substring(prev, pos)); } - //if we are at the end of the string, we tack on a $ - //then move past it + // if we are at the end of the string, we tack on a $ + // then move past it if (pos == (value.length() - 1)) { fragments.add("$"); @@ -114,8 +124,8 @@ public class PropertyUtils } else if (value.charAt(pos + 1) != '{') { - //peek ahead to see if the next char is a property or not - //not a property: insert the char as a literal + // peek ahead to see if the next char is a property or not + // not a property: insert the char as a literal if (value.charAt(pos + 1) == '$') { // two $ map to one $ @@ -135,22 +145,20 @@ public class PropertyUtils int endName = value.indexOf('}', pos); if (endName < 0) { - throw new PropertyException("Syntax error in property: " + - value); + throw new PropertyException("Syntax error in property: " + value); } + String propertyName = value.substring(pos + 2, endName); fragments.add(null); propertyRefs.add(propertyName); prev = endName + 1; } } - //no more $ signs found - //if there is any tail to the file, append it + // no more $ signs found + // if there is any tail to the file, append it if (prev < value.length()) { fragments.add(value.substring(prev)); } } - - } diff --git a/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java b/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java index 9bb03c5df6..123901b577 100644 --- a/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java +++ b/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,25 +22,44 @@ package org.apache.qpid.exchange; import org.apache.qpid.framing.AMQShortString; +/** + * Defines the names of the standard AMQP exchanges that every AMQP broker should provide. These exchange names + * and type are given in the specification. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Defines the standard AMQP exchange names. + * <tr><td> Defines the standard AMQP exchange types. + * </table> + * + * @todo A type safe enum, might be more appropriate for the exchange types. + */ public class ExchangeDefaults { - public final static AMQShortString DEFAULT_EXCHANGE_NAME = new AMQShortString("<<default>>"); - - public final static AMQShortString TOPIC_EXCHANGE_NAME = new AMQShortString("amq.topic"); - - public final static AMQShortString TOPIC_EXCHANGE_CLASS = new AMQShortString("topic"); + /** The default direct exchange, which is a special internal exchange that cannot be explicitly bound to. */ + public static final AMQShortString DEFAULT_EXCHANGE_NAME = new AMQShortString("<<default>>"); - public final static AMQShortString DIRECT_EXCHANGE_NAME = new AMQShortString("amq.direct"); + /** The pre-defined topic exchange, the broker SHOULD provide this. */ + public static final AMQShortString TOPIC_EXCHANGE_NAME = new AMQShortString("amq.topic"); - public final static AMQShortString DIRECT_EXCHANGE_CLASS = new AMQShortString("direct"); + /** Defines the identifying type name of topic exchanges. */ + public static final AMQShortString TOPIC_EXCHANGE_CLASS = new AMQShortString("topic"); - public final static AMQShortString HEADERS_EXCHANGE_NAME = new AMQShortString("amq.match"); + /** The pre-defined direct exchange, the broker MUST provide this. */ + public static final AMQShortString DIRECT_EXCHANGE_NAME = new AMQShortString("amq.direct"); - public final static AMQShortString HEADERS_EXCHANGE_CLASS = new AMQShortString("headers"); + /** Defines the identifying type name of direct exchanges. */ + public static final AMQShortString DIRECT_EXCHANGE_CLASS = new AMQShortString("direct"); - public final static AMQShortString FANOUT_EXCHANGE_NAME = new AMQShortString("amq.fanout"); + /** The pre-defined headers exchange, the specification does not say this needs to be provided. */ + public static final AMQShortString HEADERS_EXCHANGE_NAME = new AMQShortString("amq.match"); - public final static AMQShortString FANOUT_EXCHANGE_CLASS = new AMQShortString("fanout"); + /** Defines the identifying type name of headers exchanges. */ + public static final AMQShortString HEADERS_EXCHANGE_CLASS = new AMQShortString("headers"); + /** The pre-defined fanout exchange, the boker MUST provide this. */ + public static final AMQShortString FANOUT_EXCHANGE_NAME = new AMQShortString("amq.fanout"); + /** Defines the identifying type name of fanout exchanges. */ + public static final AMQShortString FANOUT_EXCHANGE_CLASS = new AMQShortString("fanout"); } diff --git a/java/common/src/main/java/org/apache/qpid/pool/Event.java b/java/common/src/main/java/org/apache/qpid/pool/Event.java index 7300ec8c3f..5996cbf89c 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/Event.java +++ b/java/common/src/main/java/org/apache/qpid/pool/Event.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,18 +23,51 @@ package org.apache.qpid.pool; import org.apache.mina.common.IoFilter; import org.apache.mina.common.IoSession; - -abstract public class Event +/** + * An Event is a continuation, which is used to break a Mina filter chain and save the current point in the chain + * for later processing. It is an abstract class, with different implementations for continuations of different kinds + * of Mina events. + * + * <p/>These continuations are typically batched by {@link Job} for processing by a worker thread pool. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Process a continuation in the context of a Mina session. + * </table> + * + * @todo Pull up _nextFilter and getNextFilter into Event, as all events use it. Inner classes need to be non-static + * to use instance variables in the parent. Consequently they need to be non-inner to be instantiable outside of + * the context of the outer Event class. The inner class construction used here is preventing common code re-use + * (though not by a huge amount), but makes for an inelegent way of handling inheritance and doesn't seem like + * a justifiable use of inner classes. Move the inner classes out into their own files. + * + * @todo Could make Event implement Runnable, FutureTask, or a custom Continuation interface, to clarify its status as + * a continuation. Job is also a continuation, as is the job completion handler. Or, as Event is totally abstract, + * it is really an interface, so could just drop it and use the continuation interface instead. + */ +public abstract class Event { - + /** + * Creates a continuation. + */ public Event() - { - } - - - abstract public void process(IoSession session); - - + { } + + /** + * Processes the continuation in the context of a Mina session. + * + * @param session The Mina session. + */ + public abstract void process(IoSession session); + + /** + * A continuation ({@link Event}) that takes a Mina messageReceived event, and passes it to a NextFilter. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Pass a Mina messageReceived event to a NextFilter. <td> {@link IoFilter.NextFilter}, {@link IoSession} + * </table> + */ public static final class ReceivedEvent extends Event { private final Object _data; @@ -59,7 +92,15 @@ abstract public class Event } } - + /** + * A continuation ({@link Event}) that takes a Mina filterWrite event, and passes it to a NextFilter. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Pass a Mina filterWrite event to a NextFilter. + * <td> {@link IoFilter.NextFilter}, {@link IoFilter.WriteRequest}, {@link IoSession} + * </table> + */ public static final class WriteEvent extends Event { private final IoFilter.WriteRequest _data; @@ -72,7 +113,6 @@ abstract public class Event _data = data; } - public void process(IoSession session) { _nextFilter.filterWrite(session, _data); @@ -84,8 +124,14 @@ abstract public class Event } } - - + /** + * A continuation ({@link Event}) that takes a Mina sessionClosed event, and passes it to a NextFilter. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Pass a Mina sessionClosed event to a NextFilter. <td> {@link IoFilter.NextFilter}, {@link IoSession} + * </table> + */ public static final class CloseEvent extends Event { private final IoFilter.NextFilter _nextFilter; @@ -96,7 +142,6 @@ abstract public class Event _nextFilter = nextFilter; } - public void process(IoSession session) { _nextFilter.sessionClosed(session); @@ -107,5 +152,4 @@ abstract public class Event return _nextFilter; } } - } diff --git a/java/common/src/main/java/org/apache/qpid/pool/Job.java b/java/common/src/main/java/org/apache/qpid/pool/Job.java index 1dafdaf4fd..ba3c5d03fa 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/Job.java +++ b/java/common/src/main/java/org/apache/qpid/pool/Job.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,48 +26,77 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.mina.common.IoSession; /** - * Holds events for a session that will be processed asynchronously by - * the thread pool in PoolingFilter. + * A Job is a continuation that batches together other continuations, specifically {@link Event}s, into one continuation. + * The {@link Event}s themselves provide methods to process themselves, so processing a job simply consists of sequentially + * processing all of its aggregated events. + * + * The constructor accepts a maximum number of events for the job, and only runs up to that maximum number when + * processing the job, but the add method does not enforce this maximum. In other words, not all the enqueued events + * may be processed in each run of the job, several runs may be required to clear the queue. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Aggregate many coninuations together into a single continuation. + * <tr><td> Sequentially process aggregated continuations. <td> {@link Event} + * <tr><td> Provide running and completion status of the aggregate continuation. + * <tr><td> Execute a terminal continuation upon job completion. <td> {@link JobCompletionHandler} + * </table> + * + * @todo Could make Job implement Runnable, FutureTask, or a custom Continuation interface, to clarify its status as a + * continuation. Job is a continuation that aggregates other continuations and as such is a usefull re-usable + * piece of code. There may be other palces than the mina filter chain where continuation batching is used within + * qpid, so abstracting this out could provide a usefull building block. This also opens the way to different + * kinds of job with a common interface, e.g. parallel or sequential jobs etc. + * + * @todo For better re-usability could make the completion handler optional. Only run it when one is set. */ public class Job implements Runnable { + /** The maximum number of events to process per run of the job. More events than this may be queued in the job. */ private final int _maxEvents; + + /** The Mina session. */ private final IoSession _session; + + /** Holds the queue of events that make up the job. */ private final java.util.Queue<Event> _eventQueue = new ConcurrentLinkedQueue<Event>(); + + /** Holds a status flag, that indicates when the job is actively running. */ private final AtomicBoolean _active = new AtomicBoolean(); - //private final AtomicInteger _refCount = new AtomicInteger(); + + /** Holds the completion continuation, called upon completion of a run of the job. */ private final JobCompletionHandler _completionHandler; + /** + * Creates a new job that aggregates many continuations together. + * + * @param session The Mina session. + * @param completionHandler The per job run, terminal continuation. + * @param maxEvents The maximum number of aggregated continuations to process per run of the job. + */ Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents) { _session = session; _completionHandler = completionHandler; _maxEvents = maxEvents; } -// -// void acquire() -// { -// _refCount.incrementAndGet(); -// } -// -// void release() -// { -// _refCount.decrementAndGet(); -// } -// -// boolean isReferenced() -// { -// return _refCount.get() > 0; -// } + /** + * Enqueus a continuation for sequential processing by this job. + * + * @param evt The continuation to enqueue. + */ void add(Event evt) { _eventQueue.add(evt); } + /** + * Sequentially processes, up to the maximum number per job, the aggregated continuations in enqueued in this job. + */ void processAll() { - //limit the number of events processed in one run + // limit the number of events processed in one run for (int i = 0; i < _maxEvents; i++) { Event e = _eventQueue.poll(); @@ -82,21 +111,37 @@ public class Job implements Runnable } } - boolean isComplete() + /** + * Tests if there are no more enqueued continuations to process. + * + * @return <tt>true</tt> if there are no enqueued continuations in this job, <tt>false</tt> otherwise. + */ + public boolean isComplete() { return _eventQueue.peek() == null; } - boolean activate() + /** + * Marks this job as active if it is inactive. This method is thread safe. + * + * @return <tt>true</tt> if this job was inactive and has now been marked as active, <tt>false</tt> otherwise. + */ + public boolean activate() { return _active.compareAndSet(false, true); } - void deactivate() + /** + * Marks this job as inactive. This method is thread safe. + */ + public void deactivate() { _active.set(false); } + /** + * Processes a batch of aggregated continuations, marks this job as inactive and call the terminal continuation. + */ public void run() { processAll(); @@ -104,7 +149,12 @@ public class Job implements Runnable _completionHandler.completed(_session, this); } - + /** + * Another interface for a continuation. + * + * @todo Get rid of this interface as there are other interfaces that could be used instead, such as FutureTask, + * Runnable or a custom Continuation interface. + */ static interface JobCompletionHandler { public void completed(IoSession session, Job job); diff --git a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java index c9c96925cb..90c09ea99e 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java +++ b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java @@ -31,22 +31,131 @@ import org.apache.mina.common.IoSession; import org.apache.qpid.pool.Event.CloseEvent; -public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionHandler +/** + * PoolingFilter, is a no-op pass through filter that hands all events down the Mina filter chain by default. As it + * adds no behaviour by default to the filter chain, it is abstract. + * + * <p/>PoolingFilter provides a capability, available to sub-classes, to handle events in the chain asynchronously, by + * adding them to a job. If a job is not active, adding an event to it activates it. If it is active, the event is + * added to the job, which will run to completion and eventually process the event. The queue on the job itself acts as + * a buffer between stages of the pipeline. + * + * <p/>There are two convenience methods, {@link #createAynschReadPoolingFilter} and + * {@link #createAynschWritePoolingFilter}, for obtaining pooling filters that handle 'messageReceived' and + * 'filterWrite' events, making it possible to process these event streams seperately. + * + * <p/>Pooling filters have a name, in order to distinguish different filter types. They set up a {@link Job} on the + * Mina session they are working with, and store it in the session against their identifying name. This allows different + * filters with different names to be set up on the same filter chain, on the same Mina session, that batch their + * workloads in different jobs. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Implement default, pass through filter. + * <tr><td> Create pooling filters and a specific thread pool. <td> {@link ReferenceCountingExecutorService} + * <tr><td> Provide the ability to batch Mina events for asynchronous processing. <td> {@link Job}, {@link Event} + * <tr><td> Provide a terminal continuation to keep jobs running till empty. + * <td> {@link Job}, {@link Job.JobCompletionHandler} + * </table> + * + * @todo This seems a bit bizarre. ReadWriteThreadModel creates seperate pooling filters for read and write events. + * The pooling filters themselves batch read and write events into jobs, but hand these jobs to a common thread + * pool for execution. So the same thread pool ends up handling read and write events, albeit with many threads + * so there is concurrency. But why go to the trouble of seperating out the read and write events in that case? + * Why not just batch them into jobs together? Perhaps its so that seperate thread pools could be used for these + * stages. + * + * @todo Why set an event limit of 10 on the Job? This also seems bizarre, as the job can have more than 10 events in + * it. Its just that it runs them 10 at a time, but the completion hander here checks if there are more to run + * and trips off another batch of 10 until they are all done. Why not just have a straight forward + * consumer/producer queue scenario without the batches of 10? + * + * @todo The static helper methods are pointless. Could just call new. + */ +public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionHandler { + /** Used for debugging purposes. */ private static final Logger _logger = Logger.getLogger(PoolingFilter.class); + /** Holds a mapping from Mina sessions to batched jobs for execution. */ private final ConcurrentMap<IoSession, Job> _jobs = new ConcurrentHashMap<IoSession, Job>(); + + /** Holds the managed reference to obtain the executor for the batched jobs. */ private final ReferenceCountingExecutorService _poolReference; + /** Used to hold a name for identifying differeny pooling filter types. */ private final String _name; + + /** Defines the maximum number of events that will be batched into a single job. */ private final int _maxEvents = Integer.getInteger("amqj.server.read_write_pool.max_events", 10); + /** + * Creates a named pooling filter, on the specified shared thread pool. + * + * @param refCountingPool The thread pool reference. + * @param name The identifying name of the filter type. + */ public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) { _poolReference = refCountingPool; _name = name; } + /** + * Helper method to get an instance of a pooling filter that handles read events asynchronously. + * + * @param refCountingPool A managed reference to the thread pool. + * @param name The filter types identifying name. + * + * @return A pooling filter for asynchronous read events. + */ + public static PoolingFilter createAynschReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) + { + return new AsynchReadPoolingFilter(refCountingPool, name); + } + + /** + * Helper method to get an instance of a pooling filter that handles write events asynchronously. + * + * @param refCountingPool A managed reference to the thread pool. + * @param name The filter types identifying name. + * + * @return A pooling filter for asynchronous write events. + */ + public static PoolingFilter createAynschWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) + { + return new AsynchWritePoolingFilter(refCountingPool, name); + } + + /** + * Called by Mina to initialize this filter. Takes a reference to the thread pool. + */ + public void init() + { + _logger.info("Init called on PoolingFilter " + toString()); + + // Called when the filter is initialised in the chain. If the reference count is + // zero this acquire will initialise the pool. + _poolReference.acquireExecutorService(); + } + + /** + * Called by Mina to clean up this filter. Releases the reference to the thread pool. + */ + public void destroy() + { + _logger.info("Destroy called on PoolingFilter " + toString()); + + // When the reference count gets to zero we release the executor service. + _poolReference.releaseExecutorService(); + } + + /** + * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running. + * + * @param session The Mina session to work in. + * @param event The event to hand off asynchronously. + */ void fireAsynchEvent(IoSession session, Event event) { Job job = getJobForSession(session); @@ -62,31 +171,50 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH } + /** + * Creates a Job on the Mina session, identified by this filters name, in which this filter places asynchronously + * handled events. + * + * @param session The Mina session. + */ public void createNewJobForSession(IoSession session) { Job job = new Job(session, this, _maxEvents); session.setAttribute(_name, job); } + /** + * Retrieves this filters Job, by this filters name, from the Mina session. + * + * @param session The Mina session. + * + * @return The Job for this filter to place asynchronous events into. + */ private Job getJobForSession(IoSession session) { return (Job) session.getAttribute(_name); } - private Job createJobForSession(IoSession session) + /*private Job createJobForSession(IoSession session) { return addJobForSession(session, new Job(session, this, _maxEvents)); - } + }*/ - private Job addJobForSession(IoSession session, Job job) + /*private Job addJobForSession(IoSession session, Job job) { // atomic so ensures all threads agree on the same job Job existing = _jobs.putIfAbsent(session, job); return (existing == null) ? job : existing; - } - - // Job.JobCompletionHandler + }*/ + + /** + * Implements a terminal continuation for the {@link Job} for this filter. Whenever the Job completes its processing + * of a batch of events this is called. This method simply re-activates the job, if it has more events to process. + * + * @param session The Mina session to work in. + * @param job The job that completed. + */ public void completed(IoSession session, Job job) { // if (job.isComplete()) @@ -109,129 +237,228 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH } } - // IoFilter methods that are processed by threads on the pool - + /** + * No-op pass through filter to the next filter in the chain. + * + * @param nextFilter The next filter in the chain. + * @param session The Mina session. + * + * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow + * overriding sub-classes the ability to. + */ public void sessionOpened(final NextFilter nextFilter, final IoSession session) throws Exception { nextFilter.sessionOpened(session); } + /** + * No-op pass through filter to the next filter in the chain. + * + * @param nextFilter The next filter in the chain. + * @param session The Mina session. + * + * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow + * overriding sub-classes the ability to. + */ public void sessionClosed(final NextFilter nextFilter, final IoSession session) throws Exception { nextFilter.sessionClosed(session); } + /** + * No-op pass through filter to the next filter in the chain. + * + * @param nextFilter The next filter in the chain. + * @param session The Mina session. + * @param status The session idle status. + * + * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow + * overriding sub-classes the ability to. + */ public void sessionIdle(final NextFilter nextFilter, final IoSession session, final IdleStatus status) throws Exception { nextFilter.sessionIdle(session, status); } + /** + * No-op pass through filter to the next filter in the chain. + * + * @param nextFilter The next filter in the chain. + * @param session The Mina session. + * @param cause The underlying exception. + * + * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow + * overriding sub-classes the ability to. + */ public void exceptionCaught(final NextFilter nextFilter, final IoSession session, final Throwable cause) throws Exception { nextFilter.exceptionCaught(session, cause); } + /** + * No-op pass through filter to the next filter in the chain. + * + * @param nextFilter The next filter in the chain. + * @param session The Mina session. + * @param message The message received. + * + * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow + * overriding sub-classes the ability to. + */ public void messageReceived(final NextFilter nextFilter, final IoSession session, final Object message) throws Exception { nextFilter.messageReceived(session, message); } + /** + * No-op pass through filter to the next filter in the chain. + * + * @param nextFilter The next filter in the chain. + * @param session The Mina session. + * @param message The message sent. + * + * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow + * overriding sub-classes the ability to. + */ public void messageSent(final NextFilter nextFilter, final IoSession session, final Object message) throws Exception { nextFilter.messageSent(session, message); } + /** + * No-op pass through filter to the next filter in the chain. + * + * @param nextFilter The next filter in the chain. + * @param session The Mina session. + * @param writeRequest The write request event. + * + * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow + * overriding sub-classes the ability to. + */ public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest) throws Exception { nextFilter.filterWrite(session, writeRequest); } - // IoFilter methods that are processed on current thread (NOT on pooled thread) - + /** + * No-op pass through filter to the next filter in the chain. + * + * @param nextFilter The next filter in the chain. + * @param session The Mina session. + * + * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow + * overriding sub-classes the ability to. + */ public void filterClose(NextFilter nextFilter, IoSession session) throws Exception { nextFilter.filterClose(session); } - public void sessionCreated(NextFilter nextFilter, IoSession session) + /** + * No-op pass through filter to the next filter in the chain. + * + * @param nextFilter The next filter in the chain. + * @param session The Mina session. + * + * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow + * overriding sub-classes the ability to. + */ + public void sessionCreated(NextFilter nextFilter, IoSession session) throws Exception { nextFilter.sessionCreated(session); } + /** + * Prints the filter types identifying name to a string, mainly for debugging purposes. + * + * @return The filter types identifying name. + */ public String toString() { return _name; } - // LifeCycle methods - - public void init() - { - _logger.info("Init called on PoolingFilter " + toString()); - // called when the filter is initialised in the chain. If the reference count is - // zero this acquire will initialise the pool - _poolReference.acquireExecutorService(); - } - - public void destroy() - { - _logger.info("Destroy called on PoolingFilter " + toString()); - // when the reference count gets to zero we release the executor service - _poolReference.releaseExecutorService(); - } - + /** + * AsynchReadPoolingFilter is a pooling filter that handles 'messageReceived' and 'sessionClosed' events + * asynchronously. + */ public static class AsynchReadPoolingFilter extends PoolingFilter { - + /** + * Creates a pooling filter that handles read events asynchronously. + * + * @param refCountingPool A managed reference to the thread pool. + * @param name The filter types identifying name. + */ public AsynchReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) { super(refCountingPool, name); } - public void messageReceived(final NextFilter nextFilter, final IoSession session, final Object message) - throws Exception + /** + * Hands off this event for asynchronous execution. + * + * @param nextFilter The next filter in the chain. + * @param session The Mina session. + * @param message The message received. + */ + public void messageReceived(NextFilter nextFilter, final IoSession session, Object message) { fireAsynchEvent(session, new Event.ReceivedEvent(nextFilter, message)); } - public void sessionClosed(final NextFilter nextFilter, final IoSession session) throws Exception + /** + * Hands off this event for asynchronous execution. + * + * @param nextFilter The next filter in the chain. + * @param session The Mina session. + */ + public void sessionClosed(final NextFilter nextFilter, final IoSession session) { fireAsynchEvent(session, new CloseEvent(nextFilter)); } - } + /** + * AsynchWritePoolingFilter is a pooling filter that handles 'filterWrite' and 'sessionClosed' events + * asynchronously. + */ public static class AsynchWritePoolingFilter extends PoolingFilter { - + /** + * Creates a pooling filter that handles write events asynchronously. + * + * @param refCountingPool A managed reference to the thread pool. + * @param name The filter types identifying name. + */ public AsynchWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) { super(refCountingPool, name); } + /** + * Hands off this event for asynchronous execution. + * + * @param nextFilter The next filter in the chain. + * @param session The Mina session. + * @param writeRequest The write request event. + */ public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest) - throws Exception { fireAsynchEvent(session, new Event.WriteEvent(nextFilter, writeRequest)); } - public void sessionClosed(final NextFilter nextFilter, final IoSession session) throws Exception + /** + * Hands off this event for asynchronous execution. + * + * @param nextFilter The next filter in the chain. + * @param session The Mina session. + */ + public void sessionClosed(final NextFilter nextFilter, final IoSession session) { fireAsynchEvent(session, new CloseEvent(nextFilter)); } - - } - - public static PoolingFilter createAynschReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) - { - return new AsynchReadPoolingFilter(refCountingPool, name); - } - - public static PoolingFilter createAynschWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) - { - return new AsynchWritePoolingFilter(refCountingPool, name); } - } diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java index 278a569715..8cea70e597 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java +++ b/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,14 +24,34 @@ import org.apache.mina.common.IoFilterChain; import org.apache.mina.common.ThreadModel; import org.apache.mina.filter.ReferenceCountingIoFilter; +/** + * ReadWriteThreadModel is a Mina i/o filter chain factory, which creates a filter chain with seperate filters to + * handle read and write events. The seperate filters are {@link PoolingFilter}s, which have thread pools to handle + * these events. The effect of this is that reading and writing may happen concurrently. + * + * <p/>Socket i/o will only happen with concurrent reads and writes if Mina has seperate selector threads for each. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Create a filter chain with seperate read and write thread pools for read/write Mina events. + * <td> {@link PoolingFilter} + * </table> + */ public class ReadWriteThreadModel implements ThreadModel { - + /** Holds the singleton instance of this factory. */ private static final ReadWriteThreadModel _instance = new ReadWriteThreadModel(); + /** Holds the thread pooling filter for reads. */ private final PoolingFilter _asynchronousReadFilter; + + /** Holds the thread pooloing filter for writes. */ private final PoolingFilter _asynchronousWriteFilter; + /** + * Creates a new factory for concurrent i/o, thread pooling filter chain construction. This is private, so that + * only a singleton instance of the factory is ever created. + */ private ReadWriteThreadModel() { final ReferenceCountingExecutorService executor = ReferenceCountingExecutorService.getInstance(); @@ -39,25 +59,44 @@ public class ReadWriteThreadModel implements ThreadModel _asynchronousWriteFilter = PoolingFilter.createAynschWritePoolingFilter(executor, "AsynchronousWriteFilter"); } + /** + * Gets the singleton instance of this filter chain factory. + * + * @return The singleton instance of this filter chain factory. + */ + public static ReadWriteThreadModel getInstance() + { + return _instance; + } + + /** + * Gets the read filter. + * + * @return The read filter. + */ public PoolingFilter getAsynchronousReadFilter() { return _asynchronousReadFilter; } + /** + * Gets the write filter. + * + * @return The write filter. + */ public PoolingFilter getAsynchronousWriteFilter() { return _asynchronousWriteFilter; } - public void buildFilterChain(IoFilterChain chain) throws Exception + /** + * Adds the concurrent read and write filters to a filter chain. + * + * @param chain The Mina filter chain to add to. + */ + public void buildFilterChain(IoFilterChain chain) { - chain.addFirst("AsynchronousReadFilter", new ReferenceCountingIoFilter(_asynchronousReadFilter)); chain.addLast("AsynchronousWriteFilter", new ReferenceCountingIoFilter(_asynchronousWriteFilter)); } - - public static ReadWriteThreadModel getInstance() - { - return _instance; - } } diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java index 637464f247..84c9e1f465 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java +++ b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,44 +24,87 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** - * We share the executor service among several PoolingFilters. This class reference counts - * how many filter chains are using the executor service and destroys the service, thus - * freeing up its threads, when the count reaches zero. It recreates the service when - * the count is incremented. + * ReferenceCountingExecutorService wraps an ExecutorService in order to provide shared reference to it. It counts + * the references taken, instantiating the service on the first reference, and shutting it down when the last + * reference is released. + * + * <p/>It is important to ensure that an executor service is correctly shut down as failing to do so prevents the JVM + * from terminating due to the existence of non-daemon threads. + * + * <p/><table id="crc><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Provide a shared exector service. <td> {@link Executors} + * <tr><td> Shutdown the executor service when not needed. <td> {@link ExecutorService} + * <tr><td> Track references to the executor service. + * <tr><td> Provide configuration of the executor service. + * </table> + * + * @todo Might be more elegant to make this actually implement ExecutorService, providing better hiding of the + * implementation details. Also this class introduces a pattern (albeit specific to this usage) that could be + * generalized to reference count anything. That is, on first instance call a create method, on release of last + * instance call a destroy method. This could definitely be abstracted out as a re-usable piece of code; a + * reference counting factory. It could then be re-used to do reference counting in other places (such as + * messages). Countable objects have a simple create/destroy life cycle, capturable by an interface that the + * ref counting factory can call to manage the lifecycle. * - * This is particularly important on the client where failing to destroy the executor - * service prevents the JVM from shutting down due to the existence of non-daemon threads. + * @todo {@link #_poolSize} should be static? * + * @todo The {@link #getPool()} method breaks the encapsulation of the reference counter. Generally when getPool is used + * further checks are applied to ensure that the exector service has not been shutdown. This passes responsibility + * for managing the lifecycle of the reference counted object onto the caller rather than neatly encapsulating it + * here. Could think about adding more state to the lifecycle, to mark ref counted objects as invalid, and have an + * isValid method, or could make calling code deal with RejectedExecutionException raised by shutdown executors. */ public class ReferenceCountingExecutorService { + /** Defines the smallest thread pool that will be allocated, irrespective of the number of processors. */ private static final int MINIMUM_POOL_SIZE = 4; + + /** Holds the number of processors on the machine. */ private static final int NUM_CPUS = Runtime.getRuntime().availableProcessors(); + + /** Defines the thread pool size to use, which is the larger of the number of CPUs or the minimum size. */ private static final int DEFAULT_POOL_SIZE = Math.max(NUM_CPUS, MINIMUM_POOL_SIZE); /** - * We need to be able to check the current reference count and if necessary - * create the executor service atomically. + * Holds the singleton instance of this reference counter. This is only created once, statically, so the + * {@link #getInstance()} method does not need to be synchronized. */ private static final ReferenceCountingExecutorService _instance = new ReferenceCountingExecutorService(); + /** This lock is used to ensure that reference counts are updated atomically with create/destroy operations. */ private final Object _lock = new Object(); + /** The shared executor service that is reference counted. */ private ExecutorService _pool; + /** Holds the number of references given out to the executor service. */ private int _refCount = 0; + /** Holds the number of executor threads to create. */ private int _poolSize = Integer.getInteger("amqj.read_write_pool_size", DEFAULT_POOL_SIZE); + /** + * Retrieves the singleton instance of this reference counter. + * + * @return The singleton instance of this reference counter. + */ public static ReferenceCountingExecutorService getInstance() { return _instance; } + /** + * Private constructor to ensure that only a singleton instance can be created. + */ private ReferenceCountingExecutorService() - { - } + { } + /** + * Provides a reference to a shared executor service, incrementing the reference count. + * + * @return An executor service. + */ ExecutorService acquireExecutorService() { synchronized (_lock) @@ -70,10 +113,15 @@ public class ReferenceCountingExecutorService { _pool = Executors.newFixedThreadPool(_poolSize); } + return _pool; } } + /** + * Releases a reference to a shared executor service, decrementing the reference count. If the refence count falls + * to zero, the executor service is shut down. + */ void releaseExecutorService() { synchronized (_lock) @@ -86,10 +134,9 @@ public class ReferenceCountingExecutorService } /** - * The filters that use the executor service should call this method to get access - * to the service. Note that this method does not alter the reference count. + * Provides access to the executor service, without touching the reference count. * - * @return the underlying executor service + * @return The shared executor service, or <tt>null</tt> if none has been instantiated yet. */ public ExecutorService getPool() { diff --git a/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java b/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java index 165c912422..61955160be 100644 --- a/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java +++ b/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java @@ -36,7 +36,7 @@ import java.util.regex.*; * that they take a default value when not set. Options may be mandatory in wich case it is an error not to specify
* them on the command line. Flags are never mandatory because they are implicitly set to false when not specified.
*
- * <p/>Some examples command line are:
+ * <p/>Some example command lines are:
*
* <ul>
* <li>This one has two options that expect arguments:
|