diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2012-01-29 13:05:07 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2012-01-29 13:05:07 +0000 |
commit | 22b64fd33e448ffc54134b28c7226342f54b4683 (patch) | |
tree | 89a27bba6ddb8674b02a810775cfb148d6de8d67 /java/common | |
parent | 9bb75aae85d397bd9dd98c8230de3e576ce97572 (diff) | |
download | qpid-python-22b64fd33e448ffc54134b28c7226342f54b4683.tar.gz |
QPID-3789 : [Java] code tidyups
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1237273 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common')
50 files changed, 208 insertions, 1975 deletions
diff --git a/java/common/Type.tpl b/java/common/Type.tpl index 7f9cfee268..4635c4e367 100644 --- a/java/common/Type.tpl +++ b/java/common/Type.tpl @@ -56,9 +56,9 @@ for t in types: out(" $name((byte) $code, $width, $fixed)") }; - public byte code; - public int width; - public boolean fixed; + private final byte code; + private final int width; + private final boolean fixed; Type(byte code, int width, boolean fixed) { @@ -67,6 +67,21 @@ for t in types: this.fixed = fixed; } + public byte getCode() + { + return code; + } + + public int getWidth() + { + return width; + } + + public boolean isFixed() + { + return fixed; + } + public static Type get(byte code) { switch (code) diff --git a/java/common/src/main/java/org/apache/configuration/PropertyNameResolver.java b/java/common/src/main/java/org/apache/configuration/PropertyNameResolver.java index 73ee747c07..1029544fd6 100644 --- a/java/common/src/main/java/org/apache/configuration/PropertyNameResolver.java +++ b/java/common/src/main/java/org/apache/configuration/PropertyNameResolver.java @@ -32,7 +32,7 @@ public class PropertyNameResolver } private static Map<Class<?>,Accessor> accessors = new HashMap<Class<?>,Accessor>(); - protected Map<String,QpidProperty> properties; + private Map<String,QpidProperty> properties; private static class BooleanAccessor implements Accessor { diff --git a/java/common/src/main/java/org/apache/qpid/AMQStoreException.java b/java/common/src/main/java/org/apache/qpid/AMQStoreException.java index 8389fe5efa..45aa36a20b 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQStoreException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQStoreException.java @@ -20,10 +20,8 @@ */ package org.apache.qpid; -import java.sql.SQLException; - /** - * StoreException is a specific type of internal error relating to errors in the message store, such as {@link SQLException}. + * StoreException is a specific type of internal error relating to errors in the message store, such as {@link java.sql.SQLException}. */ public class AMQStoreException extends AMQInternalException { diff --git a/java/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java b/java/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java index aa262bdde5..dd94f8251b 100644 --- a/java/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java +++ b/java/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java @@ -26,6 +26,10 @@ package org.apache.qpid.common; */ public final class ServerPropertyNames { + private ServerPropertyNames() + { + } + /** * Server property: federation tag UUID */ diff --git a/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java b/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java index a36e7c214e..69a6602baf 100644 --- a/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java +++ b/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java @@ -116,16 +116,7 @@ public class ClientProperties */ public static final String REJECT_BEHAVIOUR_PROP_NAME = "qpid.reject.behaviour"; - /* - public static final QpidProperty<Boolean> IGNORE_SET_CLIENTID_PROP_NAME = - QpidProperty.booleanProperty(false,"qpid.ignore_set_client_id","ignore_setclientID"); - - public static final QpidProperty<Boolean> SYNC_PERSISTENT_PROP_NAME = - QpidProperty.booleanProperty(false,"qpid.sync_persistence","sync_persistence"); - - - public static final QpidProperty<Integer> MAX_PREFETCH_PROP_NAME = - QpidProperty.intProperty(500,"qpid.max_prefetch","max_prefetch"); */ - - + private ClientProperties() + { + } } diff --git a/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java b/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java index 6e2b25fb2c..19e998733a 100644 --- a/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java +++ b/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java @@ -40,6 +40,10 @@ import java.util.Iterator; */ public class PropertyUtils { + private PropertyUtils() + { + } + /** * Given a string that contains substrings of the form <code>${xxx}</code>, looks up the valuea of 'xxx' as a * system properties and substitutes tham back into the original string, to provide a property value expanded diff --git a/java/common/src/main/java/org/apache/qpid/configuration/QpidProperty.java b/java/common/src/main/java/org/apache/qpid/configuration/QpidProperty.java index 9c0aaaec89..e88c7784a2 100644 --- a/java/common/src/main/java/org/apache/qpid/configuration/QpidProperty.java +++ b/java/common/src/main/java/org/apache/qpid/configuration/QpidProperty.java @@ -24,7 +24,7 @@ abstract class QpidProperty<T> { private T defValue; private String[] names; - protected Accessor accessor; + private Accessor accessor; QpidProperty(T defValue, String... names) { @@ -101,7 +101,12 @@ abstract class QpidProperty<T> { return new QpidStringProperty(accessor,defaultValue, names); } - + + protected Accessor getAccessor() + { + return accessor; + } + static class QpidBooleanProperty extends QpidProperty<Boolean> { QpidBooleanProperty(Boolean defValue, String... names) @@ -117,7 +122,7 @@ abstract class QpidProperty<T> @Override protected Boolean getByName(String name) { - return accessor.getBoolean(name); + return getAccessor().getBoolean(name); } } @@ -136,7 +141,7 @@ abstract class QpidProperty<T> @Override protected Integer getByName(String name) { - return accessor.getInt(name); + return getAccessor().getInt(name); } } @@ -155,7 +160,7 @@ abstract class QpidProperty<T> @Override protected Long getByName(String name) { - return accessor.getLong(name); + return getAccessor().getLong(name); } } @@ -174,7 +179,7 @@ abstract class QpidProperty<T> @Override protected String getByName(String name) { - return accessor.getString(name); + return getAccessor().getString(name); } } diff --git a/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java b/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java index 1989ade4ac..975ec4daca 100644 --- a/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java +++ b/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java @@ -36,6 +36,10 @@ import org.apache.qpid.framing.AMQShortString; */ public class ExchangeDefaults { + private ExchangeDefaults() + { + } + /** The default direct exchange, which is a special internal exchange that cannot be explicitly bound to. */ public static final AMQShortString DEFAULT_EXCHANGE_NAME = new AMQShortString("<<default>>"); diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java index f5fbbac42b..966a03605c 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java @@ -52,10 +52,6 @@ public interface AMQMethodBody extends AMQBody public void writePayload(DataOutput buffer) throws IOException; - //public abstract void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException; - - //public void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException; - public AMQFrame generateFrame(int channelId); public String toString(); diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java index 3951f17681..85870e68c5 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java @@ -321,7 +321,6 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt { final int size = length(); - //buffer.setAutoExpand(true); buffer.writeByte(size); buffer.write(_data, _offset, size); diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQTypeMap.java b/java/common/src/main/java/org/apache/qpid/framing/AMQTypeMap.java index a07fd78c8c..94a7d127b3 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQTypeMap.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQTypeMap.java @@ -27,6 +27,10 @@ public class AMQTypeMap { public static final Map<Byte, AMQType> _reverseTypeMap = new HashMap<Byte, AMQType>(); + private AMQTypeMap() + { + } + static { for(AMQType type : AMQType.values()) diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java index f942d33389..a6883917d5 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java @@ -294,85 +294,83 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti private void decode(ByteArrayDataInput buffer) throws IOException, AMQFrameDecodingException { - // ByteBuffer buffer = ByteBuffer.wrap(_encodedForm); + int headersOffset = 0; - int headersOffset = 0; - - if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0) - { - _contentType = buffer.readAMQShortString(); - headersOffset += EncodingUtils.encodedShortStringLength(_contentType); - } + if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0) + { + _contentType = buffer.readAMQShortString(); + headersOffset += EncodingUtils.encodedShortStringLength(_contentType); + } - if ((_propertyFlags & ENCODING_MASK) != 0) - { - _encoding = buffer.readAMQShortString(); - headersOffset += EncodingUtils.encodedShortStringLength(_encoding); - } + if ((_propertyFlags & ENCODING_MASK) != 0) + { + _encoding = buffer.readAMQShortString(); + headersOffset += EncodingUtils.encodedShortStringLength(_encoding); + } - if ((_propertyFlags & HEADERS_MASK) != 0) - { - long length = EncodingUtils.readUnsignedInteger(buffer); + if ((_propertyFlags & HEADERS_MASK) != 0) + { + long length = EncodingUtils.readUnsignedInteger(buffer); - _headers = new FieldTable(_encodedForm, headersOffset+4, (int)length); + _headers = new FieldTable(_encodedForm, headersOffset+4, (int)length); - buffer.skipBytes((int)length); - } + buffer.skipBytes((int)length); + } - if ((_propertyFlags & DELIVERY_MODE_MASK) != 0) - { - _deliveryMode = buffer.readByte(); - } + if ((_propertyFlags & DELIVERY_MODE_MASK) != 0) + { + _deliveryMode = buffer.readByte(); + } - if ((_propertyFlags & PRIORITY_MASK) != 0) - { - _priority = buffer.readByte(); - } + if ((_propertyFlags & PRIORITY_MASK) != 0) + { + _priority = buffer.readByte(); + } - if ((_propertyFlags & CORRELATION_ID_MASK) != 0) - { - _correlationId = buffer.readAMQShortString(); - } + if ((_propertyFlags & CORRELATION_ID_MASK) != 0) + { + _correlationId = buffer.readAMQShortString(); + } - if ((_propertyFlags & REPLY_TO_MASK) != 0) - { - _replyTo = buffer.readAMQShortString(); - } + if ((_propertyFlags & REPLY_TO_MASK) != 0) + { + _replyTo = buffer.readAMQShortString(); + } - if ((_propertyFlags & EXPIRATION_MASK) != 0) - { - _expiration = EncodingUtils.readLongAsShortString(buffer); - } + if ((_propertyFlags & EXPIRATION_MASK) != 0) + { + _expiration = EncodingUtils.readLongAsShortString(buffer); + } - if ((_propertyFlags & MESSAGE_ID_MASK) != 0) - { - _messageId = buffer.readAMQShortString(); - } + if ((_propertyFlags & MESSAGE_ID_MASK) != 0) + { + _messageId = buffer.readAMQShortString(); + } - if ((_propertyFlags & TIMESTAMP_MASK) != 0) - { - _timestamp = EncodingUtils.readTimestamp(buffer); - } + if ((_propertyFlags & TIMESTAMP_MASK) != 0) + { + _timestamp = EncodingUtils.readTimestamp(buffer); + } - if ((_propertyFlags & TYPE_MASK) != 0) - { - _type = buffer.readAMQShortString(); - } + if ((_propertyFlags & TYPE_MASK) != 0) + { + _type = buffer.readAMQShortString(); + } - if ((_propertyFlags & USER_ID_MASK) != 0) - { - _userId = buffer.readAMQShortString(); - } + if ((_propertyFlags & USER_ID_MASK) != 0) + { + _userId = buffer.readAMQShortString(); + } - if ((_propertyFlags & APPLICATION_ID_MASK) != 0) - { - _appId = buffer.readAMQShortString(); - } + if ((_propertyFlags & APPLICATION_ID_MASK) != 0) + { + _appId = buffer.readAMQShortString(); + } - if ((_propertyFlags & CLUSTER_ID_MASK) != 0) - { - _clusterId = buffer.readAMQShortString(); - } + if ((_propertyFlags & CLUSTER_ID_MASK) != 0) + { + _clusterId = buffer.readAMQShortString(); + } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java b/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java index 538e0e1327..6d05c2fd3a 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java +++ b/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java @@ -41,6 +41,10 @@ public class EncodingUtils public static final int SIZEOF_UNSIGNED_INT = 4; private static final boolean[] ALL_FALSE_ARRAY = new boolean[8]; + private EncodingUtils() + { + } + public static int encodedShortStringLength(String s) { if (s == null) @@ -115,7 +119,7 @@ public class EncodingUtils { return len + 6 + encodedShortStringLength((short) (i / 1000000)); } - else // if (i > 99999) + else // if i > 99999 { return len + 5 + encodedShortStringLength((short) (i / 100000)); } @@ -737,8 +741,6 @@ public class EncodingUtils public static long readTimestamp(DataInput buffer) throws IOException { - // Discard msb from AMQ timestamp - // buffer.getUnsignedInt(); return buffer.readLong(); } @@ -803,8 +805,6 @@ public class EncodingUtils byte[] from = new byte[size]; - // Is this not the same. - // bb.get(from, 0, length); for (int i = 0; i < size; i++) { from[i] = bb.get(i); @@ -959,7 +959,6 @@ public class EncodingUtils else { // really writing out unsigned byte - //buffer.put((byte) 0); writeUnsignedInteger(buffer, 0L); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java index b2b78ebcef..a2d4d27396 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java @@ -25,6 +25,10 @@ import java.io.IOException; public class FieldTableFactory { + private FieldTableFactory() + { + } + public static FieldTable newFieldTable() { return new FieldTable(); diff --git a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java index 3c1e67f488..0a06f0f1e9 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java @@ -43,9 +43,6 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData private final byte _protocolMajor; private final byte _protocolMinor; - -// public ProtocolInitiation() {} - public ProtocolInitiation(byte[] protocolHeader, byte protocolClass, byte protocolInstance, byte protocolMajor, byte protocolMinor) { _protocolHeader = protocolHeader; diff --git a/java/common/src/main/java/org/apache/qpid/messaging/util/JAddr.java b/java/common/src/main/java/org/apache/qpid/messaging/util/JAddr.java index 225f7c11e9..c5c2cacd1a 100644 --- a/java/common/src/main/java/org/apache/qpid/messaging/util/JAddr.java +++ b/java/common/src/main/java/org/apache/qpid/messaging/util/JAddr.java @@ -35,6 +35,9 @@ import java.util.List; public class JAddr { + private JAddr() + { + } public static final void main(String[] args) throws Exception { diff --git a/java/common/src/main/java/org/apache/qpid/messaging/util/PyPrint.java b/java/common/src/main/java/org/apache/qpid/messaging/util/PyPrint.java index ef6c724371..2681893482 100644 --- a/java/common/src/main/java/org/apache/qpid/messaging/util/PyPrint.java +++ b/java/common/src/main/java/org/apache/qpid/messaging/util/PyPrint.java @@ -33,6 +33,9 @@ import java.util.Map; public class PyPrint { + private PyPrint() + { + } public static String pprint(Object obj) { diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java index 260c17f193..185c01d3df 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java @@ -51,7 +51,6 @@ public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, Proto * * @return The method registry for a specific version of the AMQP. */ -// public VersionSpecificRegistry getRegistry(); MethodRegistry getMethodRegistry(); diff --git a/java/common/src/main/java/org/apache/qpid/thread/Threading.java b/java/common/src/main/java/org/apache/qpid/thread/Threading.java index 603e8a7441..265b336157 100644 --- a/java/common/src/main/java/org/apache/qpid/thread/Threading.java +++ b/java/common/src/main/java/org/apache/qpid/thread/Threading.java @@ -24,7 +24,11 @@ package org.apache.qpid.thread; public final class Threading { private static ThreadFactory threadFactory; - + + private Threading() + { + } + static { try { diff --git a/java/common/src/main/java/org/apache/qpid/transport/RangeSetFactory.java b/java/common/src/main/java/org/apache/qpid/transport/RangeSetFactory.java index 0f19d7e2b2..0f049aba8e 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/RangeSetFactory.java +++ b/java/common/src/main/java/org/apache/qpid/transport/RangeSetFactory.java @@ -22,6 +22,10 @@ package org.apache.qpid.transport; public class RangeSetFactory { + private RangeSetFactory() + { + } + public static RangeSet createRangeSet() { return new RangeSetImpl(); diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java index 6784bc2279..d450746eaa 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -410,7 +410,6 @@ public class Session extends SessionInvoker log.debug("ID: [%s] %s", this.channel, id); } - //if ((id % 65536) == 0) if ((id & 0xff) == 0) { flushProcessed(TIMELY_REPLY); diff --git a/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java b/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java index 8ac8093645..27fce6e167 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java +++ b/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java @@ -361,13 +361,13 @@ abstract class AbstractDecoder implements Decoder private long readSize(Type t) { - if (t.fixed) + if (t.isFixed()) { - return t.width; + return t.getWidth(); } else { - return readSize(t.width); + return readSize(t.getWidth()); } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java b/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java index 5e0207c979..a38c83d4cb 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java +++ b/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java @@ -359,7 +359,7 @@ abstract class AbstractEncoder implements Encoder Object value = entry.getValue(); Type type = encoding(value); writeStr8(key); - put(type.code); + put(type.getCode()); write(type, value); } } @@ -380,7 +380,7 @@ abstract class AbstractEncoder implements Encoder for (Object value : list) { Type type = encoding(value); - put(type.code); + put(type.getCode()); write(type, value); } } @@ -408,7 +408,7 @@ abstract class AbstractEncoder implements Encoder type = encoding(array.get(0)); } - put(type.code); + put(type.getCode()); writeUint32(array.size()); @@ -420,18 +420,18 @@ abstract class AbstractEncoder implements Encoder private void writeSize(Type t, int size) { - if (t.fixed) + if (t.isFixed()) { - if (size != t.width) + if (size != t.getWidth()) { throw new IllegalArgumentException - ("size does not match fixed width " + t.width + ": " + + ("size does not match fixed width " + t.getWidth() + ": " + size); } } else { - writeSize(t.width, size); + writeSize(t.getWidth(), size); } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java b/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java index 6a6a2b75d1..55ba95ad75 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java @@ -54,6 +54,10 @@ public class Transport OUTGOING_PROTOCOL_TO_IMPLDEFAULTS_MAP = Collections.unmodifiableMap(map); } + private Transport() + { + } + public static IncomingNetworkTransport getIncomingTransportInstance() { return (IncomingNetworkTransport) loadTransportClass( diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java index 6388762026..d51491862b 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java @@ -37,6 +37,10 @@ import java.nio.ByteBuffer; public class SecurityLayerFactory { + private SecurityLayerFactory() + { + } + public static SecurityLayer newInstance(ConnectionSettings settings) { diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java index 6c615c05b6..625e1a77c2 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java @@ -30,10 +30,10 @@ import javax.security.sasl.SaslClient; public abstract class SASLEncryptor implements ConnectionListener { - protected SaslClient saslClient; - protected boolean securityLayerEstablished = false; - protected int sendBuffSize; - protected int recvBuffSize; + private SaslClient saslClient; + private boolean securityLayerEstablished = false; + private int sendBuffSize; + private int recvBuffSize; public boolean isSecurityLayerEstablished() { @@ -61,4 +61,19 @@ public abstract class SASLEncryptor implements ConnectionListener public void closed(Connection conn) {} public abstract void securityLayerEstablished(); + + public SaslClient getSaslClient() + { + return saslClient; + } + + public int getSendBuffSize() + { + return sendBuffSize; + } + + public int getRecvBuffSize() + { + return recvBuffSize; + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java index a9321fbec1..a100b96412 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java @@ -56,11 +56,11 @@ public class SASLReceiver extends SASLEncryptor implements Receiver<ByteBuffer> { while (buf.hasRemaining()) { - int length = Math.min(buf.remaining(),recvBuffSize); + int length = Math.min(buf.remaining(), getRecvBuffSize()); buf.get(netData, 0, length); try { - byte[] out = saslClient.unwrap(netData, 0, length); + byte[] out = getSaslClient().unwrap(netData, 0, length); delegate.received(ByteBuffer.wrap(out)); } catch (SaslException e) @@ -77,7 +77,7 @@ public class SASLReceiver extends SASLEncryptor implements Receiver<ByteBuffer> public void securityLayerEstablished() { - netData = new byte[recvBuffSize]; + netData = new byte[getRecvBuffSize()]; log.debug("SASL Security Layer Established"); } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java index ce80f32eb8..61d54a8386 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java @@ -31,7 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean; public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> { - protected Sender<ByteBuffer> delegate; + private Sender<ByteBuffer> delegate; private byte[] appData; private final AtomicBoolean closed = new AtomicBoolean(false); private static final Logger log = Logger.get(SASLSender.class); @@ -52,7 +52,7 @@ public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> { { try { - saslClient.dispose(); + getSaslClient().dispose(); } catch (SaslException e) { @@ -78,14 +78,14 @@ public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> { { while (buf.hasRemaining()) { - int length = Math.min(buf.remaining(),sendBuffSize); - log.debug("sendBuffSize %s", sendBuffSize); + int length = Math.min(buf.remaining(), getSendBuffSize()); + log.debug("sendBuffSize %s", getSendBuffSize()); log.debug("buf.remaining() %s", buf.remaining()); buf.get(appData, 0, length); try { - byte[] out = saslClient.wrap(appData, 0, length); + byte[] out = getSaslClient().wrap(appData, 0, length); log.debug("out.length %s", out.length); delegate.send(ByteBuffer.wrap(out)); @@ -110,7 +110,7 @@ public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> { public void securityLayerEstablished() { - appData = new byte[sendBuffSize]; + appData = new byte[getSendBuffSize()]; log.debug("SASL Security Layer Established"); } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java index 8f01719935..71a73db71f 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java @@ -38,7 +38,11 @@ import java.security.cert.X509Certificate; public class SSLUtil { private static final Logger log = Logger.get(SSLUtil.class); - + + private SSLUtil() + { + } + public static void verifyHostname(SSLEngine engine,String hostnameExpected) { try diff --git a/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java b/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java index d76d0e249f..bd3e9bbcbc 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java +++ b/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java @@ -31,8 +31,11 @@ import static java.lang.Math.min; * @author Rafael H. Schloming */ -public class Functions +public final class Functions { + private Functions() + { + } public static final int mod(int n, int m) { diff --git a/java/common/src/main/java/org/apache/qpid/url/BindingURLParser.java b/java/common/src/main/java/org/apache/qpid/url/BindingURLParser.java index 6da468d1b8..fe7b01761b 100644 --- a/java/common/src/main/java/org/apache/qpid/url/BindingURLParser.java +++ b/java/common/src/main/java/org/apache/qpid/url/BindingURLParser.java @@ -184,7 +184,7 @@ public class BindingURLParser char nextChar = _url[_index]; // check for the following special cases. - // "myQueue?durable='true'" or just "myQueue"; + // "myQueue?durable='true'" or just "myQueue" StringBuilder builder = new StringBuilder(); while (nextChar != COLON_CHAR && nextChar != QUESTION_MARK_CHAR && nextChar != END_OF_URL_MARKER_CHAR) diff --git a/java/common/src/main/java/org/apache/qpid/url/URLHelper.java b/java/common/src/main/java/org/apache/qpid/url/URLHelper.java index 94d97d3023..8516e7fa0e 100644 --- a/java/common/src/main/java/org/apache/qpid/url/URLHelper.java +++ b/java/common/src/main/java/org/apache/qpid/url/URLHelper.java @@ -28,6 +28,10 @@ public class URLHelper public static final char ALTERNATIVE_OPTION_SEPARATOR = ','; public static final char BROKER_SEPARATOR = ';'; + private URLHelper() + { + } + public static void parseOptions(Map<String, String> optionMap, String options) throws URLSyntaxException { if ((options == null) || (options.indexOf('=') == -1)) diff --git a/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java b/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java index e4cebb6926..3d17bbf6ea 100644 --- a/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java +++ b/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java @@ -329,9 +329,6 @@ public class CommandLineParser expectingArgs = true; optionExpectingArgs = matchedOption; - // In the mean time set this options argument to the empty string in case no argument is ever - // supplied. - // options.put(matchedOption, ""); } // Check if the option was matched on its own and is a flag in which case set that flag. diff --git a/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java b/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java deleted file mode 100644 index 0f9bd64233..0000000000 --- a/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java +++ /dev/null @@ -1,258 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.util; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collection; -import java.util.Iterator; -import java.util.Queue; -import java.util.concurrent.atomic.AtomicInteger; - -public class ConcurrentLinkedMessageQueueAtomicSize<E> extends ConcurrentLinkedQueueAtomicSize<E> implements MessageQueue<E> -{ - private static final Logger _logger = LoggerFactory.getLogger(ConcurrentLinkedMessageQueueAtomicSize.class); - - protected Queue<E> _messageHead = new ConcurrentLinkedQueueAtomicSize<E>(); - - protected AtomicInteger _messageHeadSize = new AtomicInteger(0); - - @Override - public int size() - { - return super.size() + _messageHeadSize.get(); - } - - public int headSize() - { - return _messageHeadSize.get(); - } - - @Override - public E poll() - { - if (_messageHead.isEmpty()) - { - return super.poll(); - } - else - { - E e = _messageHead.poll(); - - if (_logger.isDebugEnabled()) - { - _logger.debug("Providing item(" + e + ")from message head"); - } - - if (e != null) - { - _messageHeadSize.decrementAndGet(); - } - - return e; - } - } - - @Override - public boolean remove(Object o) - { - - if (_messageHead.isEmpty()) - { - return super.remove(o); - } - else - { - if (_messageHead.remove(o)) - { - _messageHeadSize.decrementAndGet(); - - return true; - } - - return super.remove(o); - } - } - - @Override - public boolean removeAll(Collection<?> c) - { - if (_messageHead.isEmpty()) - { - return super.removeAll(c); - } - else - { - // fixme this is super.removeAll but iterator here doesn't work - // we need to be able to correctly decrement _messageHeadSize - // boolean modified = false; - // Iterator<?> e = iterator(); - // while (e.hasNext()) - // { - // if (c.contains(e.next())) - // { - // e.remove(); - // modified = true; - // _size.decrementAndGet(); - // } - // } - // return modified; - - throw new RuntimeException("Not implemented"); - } - } - - @Override - public boolean isEmpty() - { - return (_messageHead.isEmpty() && super.isEmpty()); - } - - @Override - public void clear() - { - super.clear(); - _messageHead.clear(); - } - - @Override - public boolean contains(Object o) - { - return _messageHead.contains(o) || super.contains(o); - } - - @Override - public boolean containsAll(Collection<?> o) - { - return _messageHead.containsAll(o) || super.containsAll(o); - } - - @Override - public E element() - { - if (_messageHead.isEmpty()) - { - return super.element(); - } - else - { - return _messageHead.element(); - } - } - - @Override - public E peek() - { - if (_messageHead.isEmpty()) - { - return super.peek(); - } - else - { - E o = _messageHead.peek(); - if (_logger.isDebugEnabled()) - { - _logger.debug("Peeking item (" + o + ") from message head"); - } - - return o; - } - - } - - @Override - public Iterator<E> iterator() - { - final Iterator<E> mainMessageIterator = super.iterator(); - - return new Iterator<E>() - { - private final Iterator<E> _headIterator = _messageHead.iterator(); - private final Iterator<E> _mainIterator = mainMessageIterator; - - private Iterator<E> last; - - public boolean hasNext() - { - return _headIterator.hasNext() || _mainIterator.hasNext(); - } - - public E next() - { - if (_headIterator.hasNext()) - { - last = _headIterator; - - return _headIterator.next(); - } - else - { - last = _mainIterator; - - return _mainIterator.next(); - } - } - - public void remove() - { - last.remove(); - if(last == _mainIterator) - { - decrementSize(); - } - else - { - _messageHeadSize.decrementAndGet(); - } - } - }; - } - - @Override - public boolean retainAll(Collection<?> c) - { - throw new RuntimeException("Not Implemented"); - } - - @Override - public Object[] toArray() - { - throw new RuntimeException("Not Implemented"); - } - - public boolean pushHead(E o) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Adding item(" + o + ") to head of queue"); - } - - if (_messageHead.offer(o)) - { - _messageHeadSize.incrementAndGet(); - - return true; - } - - return false; - } -} diff --git a/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueAtomicSize.java b/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueAtomicSize.java deleted file mode 100644 index dd91a067b5..0000000000 --- a/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueAtomicSize.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.util; - -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicInteger; - -public class ConcurrentLinkedQueueAtomicSize<E> extends ConcurrentLinkedQueue<E> -{ - private AtomicInteger _size = new AtomicInteger(0); - - public int size() - { - return _size.get(); - } - - protected final void decrementSize() - { - _size.decrementAndGet(); - } - - - - public boolean offer(E o) - { - - if (super.offer(o)) - { - _size.incrementAndGet(); - return true; - } - - return false; - } - - public E poll() - { - E e = super.poll(); - - if (e != null) - { - _size.decrementAndGet(); - } - - return e; - } - - @Override - public boolean remove(Object o) - { - if (super.remove(o)) - { - _size.decrementAndGet(); - return true; - } - - return false; - } -} diff --git a/java/common/src/main/java/org/apache/qpid/util/FileUtils.java b/java/common/src/main/java/org/apache/qpid/util/FileUtils.java index ac8e3da3c2..2d3e321812 100644 --- a/java/common/src/main/java/org/apache/qpid/util/FileUtils.java +++ b/java/common/src/main/java/org/apache/qpid/util/FileUtils.java @@ -46,6 +46,10 @@ import java.util.List; */ public class FileUtils { + private FileUtils() + { + } + /** * Reads a text file as a string. * diff --git a/java/common/src/main/java/org/apache/qpid/util/PrettyPrintingUtils.java b/java/common/src/main/java/org/apache/qpid/util/PrettyPrintingUtils.java deleted file mode 100644 index 93266f2486..0000000000 --- a/java/common/src/main/java/org/apache/qpid/util/PrettyPrintingUtils.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.util; - -/** - * Contains pretty printing convenienve methods for producing formatted logging output, mostly for debugging purposes. - * - * <p><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * </table> - * - * @todo Drop this. There are already array pretty printing methods it java.utils.Arrays. - */ -public class PrettyPrintingUtils -{ - /** - * Pretty prints an array of ints as a string. - * - * @param array The array to pretty print. - * - * @return The pretty printed string. - */ - public static String printArray(int[] array) - { - StringBuilder result = new StringBuilder("["); - for (int i = 0; i < array.length; i++) - { - result.append(array[i]) - .append((i < (array.length - 1)) ? ", " : ""); - } - - result.append(']'); - - return result.toString(); - } - - /** - * Pretty prints an array of strings as a string. - * - * @param array The array to pretty print. - * - * @return The pretty printed string. - */ - public static String printArray(String[] array) - { - String result = "["; - for (int i = 0; i < array.length; i++) - { - result += array[i]; - result += (i < (array.length - 1)) ? ", " : ""; - } - - result += "]"; - - return result; - } -} diff --git a/java/common/src/main/java/org/apache/qpid/util/Serial.java b/java/common/src/main/java/org/apache/qpid/util/Serial.java index 7961e4ba3c..451d5d60eb 100644 --- a/java/common/src/main/java/org/apache/qpid/util/Serial.java +++ b/java/common/src/main/java/org/apache/qpid/util/Serial.java @@ -30,6 +30,9 @@ import java.util.Comparator; public class Serial { + private Serial() + { + } public static final Comparator<Integer> COMPARATOR = new Comparator<Integer>() { diff --git a/java/common/src/main/java/org/apache/qpid/util/Strings.java b/java/common/src/main/java/org/apache/qpid/util/Strings.java index da3aab3192..f2d51ccfde 100644 --- a/java/common/src/main/java/org/apache/qpid/util/Strings.java +++ b/java/common/src/main/java/org/apache/qpid/util/Strings.java @@ -36,6 +36,9 @@ import java.util.regex.Pattern; public final class Strings { + private Strings() + { + } private static final byte[] EMPTY = new byte[0]; diff --git a/java/common/src/main/java/org/apache/qpid/util/UUIDs.java b/java/common/src/main/java/org/apache/qpid/util/UUIDs.java index 4bf6b7f0a2..d9b2dd8413 100644 --- a/java/common/src/main/java/org/apache/qpid/util/UUIDs.java +++ b/java/common/src/main/java/org/apache/qpid/util/UUIDs.java @@ -28,6 +28,9 @@ package org.apache.qpid.util; public final class UUIDs { + private UUIDs() + { + } public static final UUIDGen newGenerator() { diff --git a/java/common/src/main/java/org/apache/qpid/util/concurrent/AlreadyUnblockedException.java b/java/common/src/main/java/org/apache/qpid/util/concurrent/AlreadyUnblockedException.java deleted file mode 100644 index e0c0337898..0000000000 --- a/java/common/src/main/java/org/apache/qpid/util/concurrent/AlreadyUnblockedException.java +++ /dev/null @@ -1,34 +0,0 @@ -package org.apache.qpid.util.concurrent; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -/** - * Used to signal that a data element and its producer cannot be requeued or sent an error message when using a - * {@link BatchSynchQueue} because the producer has already been unblocked by an unblocking take on the queue. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Signal that an unblocking take has already occurred. - * </table> - */ -public class AlreadyUnblockedException extends RuntimeException -{ } diff --git a/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueue.java b/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueue.java deleted file mode 100644 index 63d8f77edb..0000000000 --- a/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueue.java +++ /dev/null @@ -1,122 +0,0 @@ -package org.apache.qpid.util.concurrent; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import java.util.Collection; -import java.util.concurrent.BlockingQueue; - -/** - * BatchSynchQueue is an abstraction of the classic producer/consumer buffer pattern for thread interaction. In this - * pattern threads can deposit data onto a buffer whilst other threads take data from the buffer and perform usefull - * work with it. A BatchSynchQueue adds to this the possibility that producers can be blocked until their data is - * consumed or until a consumer chooses to release the producer some time after consuming the data from the queue. - * - * <p>There are a number of possible advantages to using this technique when compared with having the producers - * processing their own data: - * - * <ul> - * <li>Data may be deposited asynchronously in the buffer allowing the producers to continue running.</li> - * <li>Data may be deposited synchronously in the buffer so that producers wait until their data has been processed - * before being allowed to continue.</li> - * <li>Variable rates of production/consumption can be smoothed over by the buffer as it provides space in memory to - * hold data between production and consumption.</li> - * <li>Consumers may be able to batch data as they consume it leading to more efficient consumption over - * individual data item consumption where latency associated with the consume operation can be ammortized. - * For example, it may be possibly to ammortize the cost of a disk seek over many producers.</li> - * <li>Data from seperate threads can be combined together in the buffer, providing a convenient way of spreading work - * amongst many workers and gathering the results together again.</li> - * <li>Different types of queue can be used to hold the buffer, resulting in different processing orders. For example, - * lifo, fifo, priority heap, etc.</li> - * </ul> - * - * <p/>The asynchronous type of producer/consumer buffers is already well supported by the java.util.concurrent package - * (in Java 5) and there is also a synchronous queue implementation available there too. This interface extends the - * blocking queue with some more methods for controlling a synchronous blocking queue. In particular it adds additional - * take methods that can be used to take data from a queue without releasing producers, so that consumers have an - * opportunity to confirm correct processing of the data before producers are released. It also adds a put method with - * exceptions so that consumers can signal exception cases back to producers where there are errors in the data. - * - * <p/>This type of queue is usefull in situations where consumers can obtain an efficiency gain by batching data - * from many threads but where synchronous handling of that data is neccessary because producers need to know that - * their data has been processed before they continue. For example, sending a bundle of messages together, or writing - * many records to disk at once, may result in improved performance but the originators of the messages or disk records - * need confirmation that their data has really been sent or saved to disk. - * - * <p/>The consumer can put an element back onto the queue or send an error message to the elements producer using the - * {@link SynchRecord} interface. - * - * <p/>The {@link #take()}, {@link #drainTo(java.util.Collection<? super E>)} and - * {@link #drainTo(java.util.Collection<? super E>, int)} methods from {@link BlockingQueue} should behave as if they - * have been called with unblock set to false. That is they take elements from the queue but leave the producers - * blocked. These methods do not return collections of {@link SynchRecord}s so they do not supply an interface through - * which errors or re-queuings can be applied. If these methods are used then the consumer must succesfully process - * all the records it takes. - * - * <p/>The {@link #put} method should silently swallow any exceptions that consumers attempt to return to the caller. - * In order to handle exceptions the {@link #tryPut} method must be used. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Handle synchronous puts, with possible exceptions. - * <tr><td> Allow consumers to take many records from a queue in a batch. - * <tr><td> Allow consumers to decide when to unblock synchronous producers. - * </table> - */ -public interface BatchSynchQueue<E> extends BlockingQueue<E> -{ - /** - * Tries a synchronous put into the queue. If a consumer encounters an exception condition whilst processing the - * data that is put, then this is returned to the caller wrapped inside a {@link SynchException}. - * - * @param e The data element to put into the queue. - * - * @throws InterruptedException If the thread is interrupted whilst waiting to write to the queue or whilst waiting - * on its entry in the queue being consumed. - * @throws SynchException If a consumer encounters an error whilst processing the data element. - */ - public void tryPut(E e) throws InterruptedException, SynchException; - - /** - * Takes all available data items from the queue or blocks until some become available. The returned items - * are wrapped in a {@link SynchRecord} which provides an interface to requeue them or send errors to their - * producers, where the producers are still blocked. - * - * @param c The collection to drain the data items into. - * @param unblock If set to <tt>true</tt> the producers for the taken items will be immediately unblocked. - * - * @return A count of the number of elements that were drained from the queue. - */ - public SynchRef drainTo(Collection<SynchRecord<E>> c, boolean unblock); - - /** - * Takes up to maxElements available data items from the queue or blocks until some become available. The returned - * items are wrapped in a {@link SynchRecord} which provides an interface to requeue them or send errors to their - * producers, where the producers are still blocked. - * - * @param c The collection to drain the data items into. - * @param maxElements The maximum number of elements to drain. - * @param unblock If set to <tt>true</tt> the producers for the taken items will be immediately unblocked. - * - * @return A count of the number of elements that were drained from the queue. - */ - public SynchRef drainTo(Collection<SynchRecord<E>> c, int maxElements, boolean unblock); -} diff --git a/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java b/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java deleted file mode 100644 index 1d5f3b5e46..0000000000 --- a/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java +++ /dev/null @@ -1,837 +0,0 @@ -package org.apache.qpid.util.concurrent; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.AbstractQueue; -import java.util.Collection; -import java.util.Iterator; -import java.util.Queue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; - - -/** - * Synchronous/Asynchronous puts. Asynchronous is easiest, just wait till can write to queue and deposit data. - * Synchronous is harder. Deposit data, but then must wait until deposited element/elements are taken before being - * allowed to unblock and continue. Consumer needs some options here too. Can just get the data from the buffer and - * allow any producers unblocked as a result to continue, or can get data but continue blocking while the data is - * processed before sending a message to do the unblocking. Synch/Asynch mode to be controlled by a switch. - * Unblocking/not unblocking during consumer processing to be controlled by the consumers calls. - * - * <p/>Implementing sub-classes only need to supply an implementation of a queue to produce a valid concrete - * implementation of this. This queue is only accessed through the methods {@link #insert}, {@link #extract}, - * {@link #getBufferCapacity()}, {@link #peekAtBufferHead()}. An implementation can override these methods to implement - * the buffer other than by a queue, for example, by using an array. - * - * <p/>Normal queue methods to work asynchronously. - * <p/>Put, take and drain methods from the BlockingQueue interface work synchronously but unblock producers immediately - * when their data is taken. - * <p/>The additional put, take and drain methods from the BatchSynchQueue interface work synchronously and provide the - * option to keep producers blocked until the consumer decides to release them. - * - * <p/>Removed take method that keeps producers blocked as it is pointless. Essentially it reduces this class to - * synchronous processing of individual data items, which negates the point of the hand-off design. The efficiency - * gain of the hand off design comes in being able to batch consume requests, ammortizing latency (such as caused by io) - * accross many producers. The only advantage of the single blocking take method is that it did take advantage of the - * queue ordering, which ma be usefull, for example to apply a priority ordering amongst producers. This is also an - * advantage over the java.util.concurrent.SynchronousQueue which doesn't have a backing queue which can be used to - * apply orderings. If a single item take is really needed can just use the drainTo method with a maximum of one item. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * </table> - */ -public abstract class BatchSynchQueueBase<E> extends AbstractQueue<E> implements BatchSynchQueue<E> -{ - /** Used for logging. */ - private static final Logger log = LoggerFactory.getLogger(BatchSynchQueueBase.class); - - /** Holds a reference to the queue implementation that holds the buffer. */ - private Queue<SynchRecordImpl<E>> buffer; - - /** Holds the number of items in the queue */ - private int count; - - /** Main lock guarding all access */ - private ReentrantLock lock; - - /** Condition for waiting takes */ - private Condition notEmpty; - - /** Condition for waiting puts */ - private Condition notFull; - - /** - * Creates a batch synch queue without fair thread scheduling. - */ - public BatchSynchQueueBase() - { - this(false); - } - - /** - * Ensures that the underlying buffer implementation is created. - * - * @param fair <tt>true</tt> if fairness is to be applied to threads waiting to access the buffer. - */ - public BatchSynchQueueBase(boolean fair) - { - buffer = this.createQueue(); - - // Create the buffer lock with the fairness flag set accordingly. - lock = new ReentrantLock(fair); - - // Create the non-empty and non-full condition monitors on the buffer lock. - notEmpty = lock.newCondition(); - notFull = lock.newCondition(); - } - - /** - * Returns an iterator over the elements contained in this collection. - * - * @return An iterator over the elements contained in this collection. - */ - public Iterator<E> iterator() - { - throw new RuntimeException("Not implemented."); - } - - /** - * Returns the number of elements in this collection. If the collection contains more than - * <tt>Integer.MAX_VALUE</tt> elements, returns <tt>Integer.MAX_VALUE</tt>. - * - * @return The number of elements in this collection. - */ - public int size() - { - final ReentrantLock lock = this.lock; - lock.lock(); - - try - { - return count; - } - finally - { - lock.unlock(); - } - } - - /** - * Inserts the specified element into this queue, if possible. When using queues that may impose insertion - * restrictions (for example capacity bounds), method <tt>offer</tt> is generally preferable to method - * {@link java.util.Collection#add}, which can fail to insert an element only by throwing an exception. - * - * @param e The element to insert. - * - * @return <tt>true</tt> if it was possible to add the element to this queue, else <tt>false</tt> - */ - public boolean offer(E e) - { - if (e == null) - { - throw new NullPointerException(); - } - - final ReentrantLock lock = this.lock; - lock.lock(); - - try - { - return insert(e, false); - } - finally - { - lock.unlock(); - } - } - - /** - * Inserts the specified element into this queue, waiting if necessary up to the specified wait time for space to - * become available. - * - * @param e The element to add. - * @param timeout How long to wait before giving up, in units of <tt>unit</tt> - * @param unit A <tt>TimeUnit</tt> determining how to interpret the <tt>timeout</tt> parameter. - * - * @return <tt>true</tt> if successful, or <tt>false</tt> if the specified waiting time elapses before space is - * available. - * - * @throws InterruptedException If interrupted while waiting. - * @throws NullPointerException If the specified element is <tt>null</tt>. - */ - public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException - { - if (e == null) - { - throw new NullPointerException(); - } - - final ReentrantLock lock = this.lock; - lock.lockInterruptibly(); - - long nanos = unit.toNanos(timeout); - - try - { - do - { - if (insert(e, false)) - { - return true; - } - - try - { - nanos = notFull.awaitNanos(nanos); - } - catch (InterruptedException ie) - { - notFull.signal(); // propagate to non-interrupted thread - throw ie; - } - } - while (nanos > 0); - - return false; - } - finally - { - lock.unlock(); - } - } - - /** - * Retrieves and removes the head of this queue, or <tt>null</tt> if this queue is empty. - * - * @return The head of this queue, or <tt>null</tt> if this queue is empty. - */ - public E poll() - { - final ReentrantLock lock = this.lock; - - lock.lock(); - try - { - if (count == 0) - { - return null; - } - - E x = extract(true, true).getElement(); - - return x; - } - finally - { - lock.unlock(); - } - } - - /** - * Retrieves and removes the head of this queue, waiting if necessary up to the specified wait time if no elements - * are present on this queue. - * - * @param timeout How long to wait before giving up, in units of <tt>unit</tt>. - * @param unit A <tt>TimeUnit</tt> determining how to interpret the <tt>timeout</tt> parameter. - * - * @return The head of this queue, or <tt>null</tt> if the specified waiting time elapses before an element is present. - * - * @throws InterruptedException If interrupted while waiting. - */ - public E poll(long timeout, TimeUnit unit) throws InterruptedException - { - final ReentrantLock lock = this.lock; - lock.lockInterruptibly(); - try - { - long nanos = unit.toNanos(timeout); - - do - { - if (count != 0) - { - E x = extract(true, true).getElement(); - - return x; - } - - try - { - nanos = notEmpty.awaitNanos(nanos); - } - catch (InterruptedException ie) - { - notEmpty.signal(); // propagate to non-interrupted thread - throw ie; - } - } - while (nanos > 0); - - return null; - } - finally - { - lock.unlock(); - } - } - - /** - * Retrieves, but does not remove, the head of this queue, returning <tt>null</tt> if this queue is empty. - * - * @return The head of this queue, or <tt>null</tt> if this queue is empty. - */ - public E peek() - { - final ReentrantLock lock = this.lock; - lock.lock(); - - try - { - return peekAtBufferHead(); - } - finally - { - lock.unlock(); - } - } - - /** - * Returns the number of elements that this queue can ideally (in the absence of memory or resource constraints) - * accept without blocking, or <tt>Integer.MAX_VALUE</tt> if there is no intrinsic limit. - * - * <p>Note that you <em>cannot</em> always tell if an attempt to <tt>add</tt> an element will succeed by - * inspecting <tt>remainingCapacity</tt> because it may be the case that another thread is about to <tt>put</tt> - * or <tt>take</tt> an element. - * - * @return The remaining capacity. - */ - public int remainingCapacity() - { - final ReentrantLock lock = this.lock; - lock.lock(); - - try - { - return getBufferCapacity() - count; - } - finally - { - lock.unlock(); - } - } - - /** - * Adds the specified element to this queue, waiting if necessary for space to become available. - * - * <p/>This method delegated to {@link #tryPut} which can raise {@link SynchException}s. If any are raised - * this method silently ignores them. Use the {@link #tryPut} method directly if you want to catch these - * exceptions. - * - * @param e The element to add. - * - * @throws InterruptedException If interrupted while waiting. - */ - public void put(E e) throws InterruptedException - { - try - { - tryPut(e); - } - catch (SynchException ex) - { - // This exception is deliberately ignored. See the method comment for information about this. - } - } - - /** - * Tries a synchronous put into the queue. If a consumer encounters an exception condition whilst processing the - * data that is put, then this is returned to the caller wrapped inside a {@link SynchException}. - * - * @param e The data element to put into the queue. Cannot be null. - * - * @throws InterruptedException If the thread is interrupted whilst waiting to write to the queue or whilst waiting - * on its entry in the queue being consumed. - * @throws SynchException If a consumer encounters an error whilst processing the data element. - */ - public void tryPut(E e) throws InterruptedException, SynchException - { - if (e == null) - { - throw new NullPointerException(); - } - - // final Queue<E> items = this.buffer; - final ReentrantLock lock = this.lock; - lock.lockInterruptibly(); - - try - { - while (count == getBufferCapacity()) - { - // Release the lock and wait until the queue is not full. - notFull.await(); - } - } - catch (InterruptedException ie) - { - notFull.signal(); // propagate to non-interrupted thread - throw ie; - } - - // There is room in the queue so insert must succeed. Insert into the queu, release the lock and block - // the producer until its data is taken. - insert(e, true); - } - - /** - * Retrieves and removes the head of this queue, waiting if no elements are present on this queue. - * Any producer that has its data element taken by this call will be immediately unblocked. To keep the - * producer blocked whilst taking just a single item, use the - * {@link #drainTo(java.util.Collection<org.apache.qpid.util.concurrent.SynchRecord<E>>, int, boolean)} - * method. There is no take method to do that because there is not usually any advantage in a synchronous hand - * off design that consumes data one item at a time. It is normal to consume data in chunks to ammortize consumption - * latencies accross many producers where possible. - * - * @return The head of this queue. - * - * @throws InterruptedException if interrupted while waiting. - */ - public E take() throws InterruptedException - { - final ReentrantLock lock = this.lock; - lock.lockInterruptibly(); - - try - { - try - { - while (count == 0) - { - // Release the lock and wait until the queue becomes non-empty. - notEmpty.await(); - } - } - catch (InterruptedException ie) - { - notEmpty.signal(); // propagate to non-interrupted thread - throw ie; - } - - // There is data in the queue so extraction must succeed. Notify any waiting threads that the queue is - // not full, and unblock the producer that owns the data item that is taken. - E x = extract(true, true).getElement(); - - return x; - } - finally - { - lock.unlock(); - } - } - - /** - * Removes all available elements from this queue and adds them into the given collection. This operation may be - * more efficient than repeatedly polling this queue. A failure encountered while attempting to <tt>add</tt> elements - * to collection <tt>c</tt> may result in elements being in neither, either or both collections when the associated - * exception is thrown. Attempts to drain a queue to itself result in <tt>IllegalArgumentException</tt>. Further, - * the behavior of this operation is undefined if the specified collection is modified while the operation is in - * progress. - * - * @param objects The collection to transfer elements into. - * - * @return The number of elements transferred. - * - * @throws NullPointerException If objects is null. - * @throws IllegalArgumentException If objects is this queue. - */ - public int drainTo(Collection<? super E> objects) - { - return drainTo(objects, -1); - } - - /** - * Removes at most the given number of available elements from this queue and adds them into the given collection. - * A failure encountered while attempting to <tt>add</tt> elements to collection <tt>c</tt> may result in elements - * being in neither, either or both collections when the associated exception is thrown. Attempts to drain a queue - * to itself result in <tt>IllegalArgumentException</tt>. Further, the behavior of this operation is undefined if - * the specified collection is modified while the operation is in progress. - * - * @param objects The collection to transfer elements into. - * @param maxElements The maximum number of elements to transfer. If this is -1 then that is interpreted as meaning - * all elements. - * - * @return The number of elements transferred. - * - * @throws NullPointerException If c is null. - * @throws IllegalArgumentException If c is this queue. - */ - public int drainTo(Collection<? super E> objects, int maxElements) - { - if (objects == null) - { - throw new NullPointerException(); - } - - if (objects == this) - { - throw new IllegalArgumentException(); - } - - // final Queue<E> items = this.buffer; - final ReentrantLock lock = this.lock; - lock.lock(); - - try - { - int n = 0; - - for (int max = ((maxElements >= count) || (maxElements < 0)) ? count : maxElements; n < max; n++) - { - // Take items from the queue, do unblock the producers, but don't send not full signals yet. - objects.add(extract(true, false).getElement()); - } - - if (n > 0) - { - // count -= n; - notFull.signalAll(); - } - - return n; - } - finally - { - lock.unlock(); - } - } - - /** - * Takes all available data items from the queue or blocks until some become available. The returned items - * are wrapped in a {@link SynchRecord} which provides an interface to requeue them or send errors to their - * producers, where the producers are still blocked. - * - * @param c The collection to drain the data items into. - * @param unblock If set to <tt>true</tt> the producers for the taken items will be immediately unblocked. - * - * @return A count of the number of elements that were drained from the queue. - */ - public SynchRef drainTo(Collection<SynchRecord<E>> c, boolean unblock) - { - return drainTo(c, -1, unblock); - } - - /** - * Takes up to maxElements available data items from the queue or blocks until some become available. The returned - * items are wrapped in a {@link SynchRecord} which provides an interface to requeue them or send errors to their - * producers, where the producers are still blocked. - * - * @param coll The collection to drain the data items into. - * @param maxElements The maximum number of elements to drain. - * @param unblock If set to <tt>true</tt> the producers for the taken items will be immediately unblocked. - * - * @return A count of the number of elements that were drained from the queue. - */ - public SynchRef drainTo(Collection<SynchRecord<E>> coll, int maxElements, boolean unblock) - { - if (coll == null) - { - throw new NullPointerException(); - } - - // final Queue<E> items = this.buffer; - final ReentrantLock lock = this.lock; - lock.lock(); - - try - { - int n = 0; - - for (int max = ((maxElements >= count) || (maxElements < 0)) ? count : maxElements; n < max; n++) - { - // Extract the next record from the queue, don't signall the not full condition yet and release - // producers depending on whether the caller wants to or not. - coll.add(extract(false, unblock)); - } - - if (n > 0) - { - // count -= n; - notFull.signalAll(); - } - - return new SynchRefImpl(n, coll); - } - finally - { - lock.unlock(); - } - } - - /** - * This abstract method should be overriden to return an empty queue. Different implementations of producer - * consumer buffers can control the order in which data is accessed using different queue implementations. - * This method allows the type of queue to be abstracted out of this class and to be supplied by concrete - * implementations. - * - * @return An empty queue. - */ - protected abstract <T> Queue<T> createQueue(); - - /** - * Insert element into the queue, then possibly signal that the queue is not empty and block the producer - * on the element until permission to procede is given. - * - * <p/>If the producer is to be blocked then the lock must be released first, otherwise no other process - * will be able to get access to the queue. Hence, unlock and block are always set together. - * - * <p/>Call only when holding the global lock. - * - * @param unlockAndBlock <tt>true</tt>If the global queue lock should be released and the producer should be blocked. - * - * @return <tt>true</tt> if the operation succeeded, <tt>false</tt> otherwise. If the result is <tt>true</tt> this - * method may not return straight away, but only after the producer is unblocked by having its data - * consumed if the unlockAndBlock flag is set. In the false case the method will return straight away, no - * matter what value the unlockAndBlock flag has, leaving the global lock on. - */ - protected boolean insert(E x, boolean unlockAndBlock) - { - // Create a new record for the data item. - SynchRecordImpl<E> record = new SynchRecordImpl<E>(x); - - boolean result = buffer.offer(record); - - if (result) - { - count++; - - // Tell any waiting consumers that the queue is not empty. - notEmpty.signal(); - - if (unlockAndBlock) - { - // Allow other threads to read/write the queue. - lock.unlock(); - - // Wait until a consumer takes this data item. - record.waitForConsumer(); - } - - return true; - } - else - { - return false; - } - } - - /** - * Extract element at current take position, advance, and signal. - * - * <p/>Call only when holding lock. - */ - protected SynchRecordImpl<E> extract(boolean unblock, boolean signal) - { - SynchRecordImpl<E> result = buffer.remove(); - count--; - - if (signal) - { - notFull.signal(); - } - - if (unblock) - { - result.releaseImmediately(); - } - - return result; - } - - /** - * Get the capacity of the buffer. If the buffer has no maximum capacity then Integer.MAX_VALUE is returned. - * - * <p/>Call only when holding lock. - * - * @return The maximum capacity of the buffer. - */ - protected int getBufferCapacity() - { - if (buffer instanceof Capacity) - { - return ((Capacity) buffer).getCapacity(); - } - else - { - return Integer.MAX_VALUE; - } - } - - /** - * Return the head element from the buffer. - * - * <p/>Call only when holding lock. - * - * @return The head element from the buffer. - */ - protected E peekAtBufferHead() - { - return buffer.peek().getElement(); - } - - public class SynchRefImpl implements SynchRef - { - /** Holds the number of synch records associated with this reference. */ - private int numRecords; - - /** Holds a reference to the collection of synch records managed by this. */ - private Collection<SynchRecord<E>> records; - - public SynchRefImpl(int n, Collection<SynchRecord<E>> records) - { - this.numRecords = n; - this.records = records; - } - - public int getNumRecords() - { - return numRecords; - } - - /** - * Any producers that have had their data elements taken from the queue but have not been unblocked are unblocked - * when this method is called. The exception to this is producers that have had their data put back onto the queue - * by a consumer. Producers that have had exceptions for their data items registered by consumers will be unblocked - * but will not return from their put call normally, but with an exception instead. - */ - public void unblockProducers() - { - log.debug("public void unblockProducers(): called"); - - if (records != null) - { - for (SynchRecord<E> record : records) - { - // This call takes account of items that have already been released, are to be requeued or are in - // error. - record.releaseImmediately(); - } - } - - records = null; - } - } - - /** - * A SynchRecordImpl is used by a {@link BatchSynchQueue} to pair together a producer with its data. This allows - * the producer of data to be identified so that it can be unblocked when its data is consumed or sent errors when - * its data cannot be consumed. - */ - public class SynchRecordImpl<E> implements SynchRecord<E> - { - /** A boolean latch that determines when the producer for this data item will be allowed to continue. */ - private BooleanLatch latch = new BooleanLatch(); - - /** The data element associated with this item. */ - private E element; - - /** - * Create a new synch record. - * - * @param e The data element that the record encapsulates. - */ - public SynchRecordImpl(E e) - { - // Keep the data element. - element = e; - } - - /** - * Waits until the producer is given permission to proceded by a consumer. - */ - public void waitForConsumer() - { - latch.await(); - } - - /** - * Gets the data element contained by this record. - * - * @return The data element contained by this record. - */ - public E getElement() - { - return element; - } - - /** - * Immediately releases the producer of this data record. Consumers can bring the synchronization time of - * producers to a minimum by using this method to release them at the earliest possible moment when batch - * consuming records from sychronized producers. - */ - public void releaseImmediately() - { - // Check that the record has not already been released, is in error or is to be requeued. - latch.signal(); - - // Propagate errors to the producer. - - // Requeue items to be requeued. - } - - /** - * Tells the synch queue to put this element back onto the queue instead of releasing its producer. - * The element is not requeued immediately but upon calling the {@link SynchRef#unblockProducers()} method or - * the {@link #releaseImmediately()} method. - * - * <p/>This method will raise a runtime exception {@link AlreadyUnblockedException} if the producer for this - * element has already been unblocked. - */ - public void reQueue() - { - throw new RuntimeException("Not implemented."); - } - - /** - * Tells the synch queue to raise an exception with this elements producer. The exception is not raised - * immediately but upon calling the {@link SynchRef#unblockProducers()} method or the - * {@link #releaseImmediately()} method. The exception will be wrapped in a {@link SynchException} before it is - * raised on the producer. - * - * <p/>This method is unusual in that it accepts an exception as an argument. This is non-standard but is used - * because the exception is to be passed onto a different thread. - * - * <p/>This method will raise a runtime exception {@link AlreadyUnblockedException} if the producer for this - * element has already been unblocked. - * - * @param e The exception to raise on the producer. - */ - public void inError(Exception e) - { - throw new RuntimeException("Not implemented."); - } - } -} diff --git a/java/common/src/main/java/org/apache/qpid/util/concurrent/BooleanLatch.java b/java/common/src/main/java/org/apache/qpid/util/concurrent/BooleanLatch.java deleted file mode 100644 index 0e4a07594f..0000000000 --- a/java/common/src/main/java/org/apache/qpid/util/concurrent/BooleanLatch.java +++ /dev/null @@ -1,128 +0,0 @@ -package org.apache.qpid.util.concurrent; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import java.util.concurrent.locks.AbstractQueuedSynchronizer; - -/** - * A BooleanLatch is like a set of traffic lights, where threads can wait at a red light until another thread gives - * the green light. When threads arrive at the latch it is initially red. They queue up until the green signal is - * given, at which point they can all acquire the latch in shared mode and continue to run concurrently. Once the latch - * is signalled it cannot be reset to red again. - * - * <p/> The latch uses a {@link java.util.concurrent.locks.AbstractQueuedSynchronizer} to implement its synchronization. - * This has two internal states, 0 which means that the latch is blocked, and 1 which means that the latch is open. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Block threads until a go signal is given. - * </table> - * - * @todo Might be better to use a countdown latch to count down from 1. Its await method can throw interrupted - * exception which makes the possibility of interruption more explicit, and provides a reminder to recheck the - * latch condition before continuing. - */ -public class BooleanLatch -{ - /** Holds the synchronizer that provides the thread queueing synchronization. */ - private final Sync sync = new Sync(); - - /** - * Tests whether or not the latch has been signalled, that is to say that, the light is green. - * - * <p/>This method is non-blocking. - * - * @return <tt>true</tt> if the latch may be acquired; the light is green. - */ - public boolean isSignalled() - { - return sync.isSignalled(); - } - - /** - * Waits on the latch until the signal is given and the light is green. If the light is already green then the - * latch will be acquired and the thread will not have to wait. - * - * <p/>This method will block until the go signal is given or the thread is otherwise interrupted. Before carrying - * out any processing threads that return from this method should confirm that the go signal has really been given - * on this latch by calling the {@link #isSignalled()} method. - */ - public void await() - { - sync.acquireShared(1); - } - - /** - * Releases any threads currently waiting on the latch. This flips the light to green allowing any threads that - * were waiting for this condition to now run. - * - * <p/>This method is non-blocking. - */ - public void signal() - { - sync.releaseShared(1); - } - - /** - * Implements a thread queued synchronizer. The internal state 0 means that the queue is blocked and the internl - * state 1 means that the queue is released and that all waiting threads can acquire the synchronizer in shared - * mode. - */ - private static class Sync extends AbstractQueuedSynchronizer - { - /** - * Attempts to acquire this synchronizer in shared mode. It may be acquired once it has been released. - * - * @param ignore This parameter is ignored. - * - * @return 1 if the shared acquisition succeeds and -1 if it fails. - */ - protected int tryAcquireShared(int ignore) - { - return isSignalled() ? 1 : -1; - } - - /** - * Releases the synchronizer, setting its internal state to 1. - * - * @param ignore This parameter is ignored. - * - * @return <tt>true</tt> always. - */ - protected boolean tryReleaseShared(int ignore) - { - setState(1); - - return true; - } - - /** - * Tests if the synchronizer is signalled. It is signalled when its internal state it 1. - * - * @return <tt>true</tt> if the internal state is 1, <tt>false</tt> otherwise. - */ - boolean isSignalled() - { - return getState() != 0; - } - } -} diff --git a/java/common/src/main/java/org/apache/qpid/util/concurrent/Capacity.java b/java/common/src/main/java/org/apache/qpid/util/concurrent/Capacity.java deleted file mode 100644 index a97ce0e172..0000000000 --- a/java/common/src/main/java/org/apache/qpid/util/concurrent/Capacity.java +++ /dev/null @@ -1,35 +0,0 @@ -package org.apache.qpid.util.concurrent; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -/** - * An interface exposed by data structures that have a maximum capacity. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Report the maximum capacity. - * </table> - */ -public interface Capacity -{ - public int getCapacity(); -} diff --git a/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchBuffer.java b/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchBuffer.java deleted file mode 100644 index bc63eb0353..0000000000 --- a/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchBuffer.java +++ /dev/null @@ -1,50 +0,0 @@ -package org.apache.qpid.util.concurrent; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import java.util.Queue; - -/** - * SynchBuffer completes the {@link BatchSynchQueueBase} abstract class by providing an implementation of the underlying - * queue as an array. This uses FIFO ordering for the queue but restricts the maximum size of the queue to a fixed - * amount. It also has the advantage that, as the buffer does not grow and shrink dynamically, memory for the buffer - * is allocated up front and does not create garbage during the operation of the queue. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Provide array based FIFO queue to create a batch synched queue around. - * </table> - * - * @todo Write an array based buffer implementation that implements Queue. - */ -public class SynchBuffer<E> extends BatchSynchQueueBase<E> -{ - /** - * Returns an empty queue, implemented as an array. - * - * @return An empty queue, implemented as an array. - */ - protected <T> Queue<T> createQueue() - { - throw new RuntimeException("Not implemented."); - } -} diff --git a/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchException.java b/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchException.java deleted file mode 100644 index 23ee695079..0000000000 --- a/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchException.java +++ /dev/null @@ -1,52 +0,0 @@ -package org.apache.qpid.util.concurrent; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -/** - * SynchException is used to encapsulate exceptions with the data elements that caused them in order to send exceptions - * back from the consumers of a {@link BatchSynchQueue} to producers. The underlying exception should be retrieved from - * the {@link #getCause} method. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Encapsulate a data element and exception. - * </table> - */ -public class SynchException extends Exception -{ - /** Holds the data element that is in error. */ - private Object element; - - /** - * Creates a new BaseApplicationException object. - * - * @param message The exception message. - * @param cause The underlying throwable cause. This may be null. - */ - public SynchException(String message, Throwable cause, Object element) - { - super(message, cause); - - // Keep the data element that was in error. - this.element = element; - } -} diff --git a/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchQueue.java b/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchQueue.java deleted file mode 100644 index 95833f398a..0000000000 --- a/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchQueue.java +++ /dev/null @@ -1,48 +0,0 @@ -package org.apache.qpid.util.concurrent; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import java.util.LinkedList; -import java.util.Queue; - -/** - * SynchQueue completes the {@link BatchSynchQueueBase} abstract class by providing an implementation of the underlying - * queue as a linked list. This uses FIFO ordering for the queue and allows the queue to grow to accomodate more - * elements as needed. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Provide linked list FIFO queue to create a batch synched queue around. - * </table> - */ -public class SynchQueue<E> extends BatchSynchQueueBase<E> -{ - /** - * Returns an empty queue, implemented as a linked list. - * - * @return An empty queue, implemented as a linked list. - */ - protected <T> Queue<T> createQueue() - { - return new LinkedList<T>(); - } -} diff --git a/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRecord.java b/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRecord.java deleted file mode 100644 index fd740c20cd..0000000000 --- a/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRecord.java +++ /dev/null @@ -1,74 +0,0 @@ -package org.apache.qpid.util.concurrent; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -/** - * SynchRecord associates a data item from a {@link BatchSynchQueue} with its producer. This enables the data item data - * item to be put back on the queue without unblocking its producer, or to send exceptions to the producer. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Get the underlying data element. - * <tr><td> Put the data element back on the queue without unblocking its producer. - * <tr><td> Send and exception to the data elements producer. - * </table> - */ -public interface SynchRecord<E> -{ - /** - * Gets the data element contained by this record. - * - * @return The data element contained by this record. - */ - public E getElement(); - - /** - * Tells the synch queue to put this element back onto the queue instead of releasing its producer. - * The element is not requeued immediately but upon calling the {@link SynchRef#unblockProducers()} method. - * - * <p/>This method will raise a runtime exception {@link AlreadyUnblockedException} if the producer for this element - * has already been unblocked. - */ - public void reQueue(); - - /** - * Immediately releases the producer of this data record. Consumers can bring the synchronization time of - * producers to a minimum by using this method to release them at the earliest possible moment when batch - * consuming records from sychronized producers. - */ - public void releaseImmediately(); - - /** - * Tells the synch queue to raise an exception with this elements producer. The exception is not raised immediately - * but upon calling the {@link SynchRef#unblockProducers()} method. The exception will be wrapped in a - * {@link SynchException} before it is raised on the producer. - * - * <p/>This method is unusual in that it accepts an exception as an argument. This is non-standard but is used - * because the exception is to be passed onto a different thread. - * - * <p/>This method will raise a runtime exception {@link AlreadyUnblockedException} if the producer for this element - * has already been unblocked. - * - * @param e The exception to raise on the producer. - */ - public void inError(Exception e); -} diff --git a/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRef.java b/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRef.java deleted file mode 100644 index efe2344c06..0000000000 --- a/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRef.java +++ /dev/null @@ -1,51 +0,0 @@ -package org.apache.qpid.util.concurrent; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -/** - * A SynchRef is an interface which is returned from the synchronous take and drain methods of {@link BatchSynchQueue}, - * allowing call-backs to be made against the synchronizing strucutre. It allows the consumer to communicate when it - * wants producers that have their data taken to be unblocked. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities - * <tr><td> Report number of records returned by a taking operation. - * <tr><td> Provide call-back to release producers of taken records. - * </table> - */ -public interface SynchRef -{ - /** - * Reports the number of records taken by the take or drain operation. - * - * @return The number of records taken by the take or drain operation. - */ - public int getNumRecords(); - - /** - * Any producers that have had their data elements taken from the queue but have not been unblocked are - * unblocked when this method is called. The exception to this is producers that have had their data put back - * onto the queue by a consumer. Producers that have had exceptions for their data items registered by consumers - * will be unblocked but will not return from their put call normally, but with an exception instead. - */ - public void unblockProducers(); -} |