summaryrefslogtreecommitdiff
path: root/java/common
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-01-29 13:05:07 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-01-29 13:05:07 +0000
commit22b64fd33e448ffc54134b28c7226342f54b4683 (patch)
tree89a27bba6ddb8674b02a810775cfb148d6de8d67 /java/common
parent9bb75aae85d397bd9dd98c8230de3e576ce97572 (diff)
downloadqpid-python-22b64fd33e448ffc54134b28c7226342f54b4683.tar.gz
QPID-3789 : [Java] code tidyups
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1237273 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common')
-rw-r--r--java/common/Type.tpl21
-rw-r--r--java/common/src/main/java/org/apache/configuration/PropertyNameResolver.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQStoreException.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java15
-rw-r--r--java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/configuration/QpidProperty.java17
-rw-r--r--java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java1
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQTypeMap.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java124
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java11
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java3
-rw-r--r--java/common/src/main/java/org/apache/qpid/messaging/util/JAddr.java3
-rw-r--r--java/common/src/main/java/org/apache/qpid/messaging/util/PyPrint.java3
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java1
-rw-r--r--java/common/src/main/java/org/apache/qpid/thread/Threading.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/RangeSetFactory.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Session.java1
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java14
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/Transport.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java23
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java12
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/util/Functions.java5
-rw-r--r--java/common/src/main/java/org/apache/qpid/url/BindingURLParser.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/url/URLHelper.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java3
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java258
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueAtomicSize.java77
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/FileUtils.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/PrettyPrintingUtils.java75
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/Serial.java3
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/Strings.java3
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/UUIDs.java3
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/concurrent/AlreadyUnblockedException.java34
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueue.java122
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java837
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/concurrent/BooleanLatch.java128
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/concurrent/Capacity.java35
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/concurrent/SynchBuffer.java50
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/concurrent/SynchException.java52
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/concurrent/SynchQueue.java48
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRecord.java74
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRef.java51
50 files changed, 208 insertions, 1975 deletions
diff --git a/java/common/Type.tpl b/java/common/Type.tpl
index 7f9cfee268..4635c4e367 100644
--- a/java/common/Type.tpl
+++ b/java/common/Type.tpl
@@ -56,9 +56,9 @@ for t in types:
out(" $name((byte) $code, $width, $fixed)")
};
- public byte code;
- public int width;
- public boolean fixed;
+ private final byte code;
+ private final int width;
+ private final boolean fixed;
Type(byte code, int width, boolean fixed)
{
@@ -67,6 +67,21 @@ for t in types:
this.fixed = fixed;
}
+ public byte getCode()
+ {
+ return code;
+ }
+
+ public int getWidth()
+ {
+ return width;
+ }
+
+ public boolean isFixed()
+ {
+ return fixed;
+ }
+
public static Type get(byte code)
{
switch (code)
diff --git a/java/common/src/main/java/org/apache/configuration/PropertyNameResolver.java b/java/common/src/main/java/org/apache/configuration/PropertyNameResolver.java
index 73ee747c07..1029544fd6 100644
--- a/java/common/src/main/java/org/apache/configuration/PropertyNameResolver.java
+++ b/java/common/src/main/java/org/apache/configuration/PropertyNameResolver.java
@@ -32,7 +32,7 @@ public class PropertyNameResolver
}
private static Map<Class<?>,Accessor> accessors = new HashMap<Class<?>,Accessor>();
- protected Map<String,QpidProperty> properties;
+ private Map<String,QpidProperty> properties;
private static class BooleanAccessor implements Accessor
{
diff --git a/java/common/src/main/java/org/apache/qpid/AMQStoreException.java b/java/common/src/main/java/org/apache/qpid/AMQStoreException.java
index 8389fe5efa..45aa36a20b 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQStoreException.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQStoreException.java
@@ -20,10 +20,8 @@
*/
package org.apache.qpid;
-import java.sql.SQLException;
-
/**
- * StoreException is a specific type of internal error relating to errors in the message store, such as {@link SQLException}.
+ * StoreException is a specific type of internal error relating to errors in the message store, such as {@link java.sql.SQLException}.
*/
public class AMQStoreException extends AMQInternalException
{
diff --git a/java/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java b/java/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java
index aa262bdde5..dd94f8251b 100644
--- a/java/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java
+++ b/java/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java
@@ -26,6 +26,10 @@ package org.apache.qpid.common;
*/
public final class ServerPropertyNames
{
+ private ServerPropertyNames()
+ {
+ }
+
/**
* Server property: federation tag UUID
*/
diff --git a/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java b/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
index a36e7c214e..69a6602baf 100644
--- a/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
+++ b/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
@@ -116,16 +116,7 @@ public class ClientProperties
*/
public static final String REJECT_BEHAVIOUR_PROP_NAME = "qpid.reject.behaviour";
- /*
- public static final QpidProperty<Boolean> IGNORE_SET_CLIENTID_PROP_NAME =
- QpidProperty.booleanProperty(false,"qpid.ignore_set_client_id","ignore_setclientID");
-
- public static final QpidProperty<Boolean> SYNC_PERSISTENT_PROP_NAME =
- QpidProperty.booleanProperty(false,"qpid.sync_persistence","sync_persistence");
-
-
- public static final QpidProperty<Integer> MAX_PREFETCH_PROP_NAME =
- QpidProperty.intProperty(500,"qpid.max_prefetch","max_prefetch"); */
-
-
+ private ClientProperties()
+ {
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java b/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java
index 6e2b25fb2c..19e998733a 100644
--- a/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java
+++ b/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java
@@ -40,6 +40,10 @@ import java.util.Iterator;
*/
public class PropertyUtils
{
+ private PropertyUtils()
+ {
+ }
+
/**
* Given a string that contains substrings of the form <code>${xxx}</code>, looks up the valuea of 'xxx' as a
* system properties and substitutes tham back into the original string, to provide a property value expanded
diff --git a/java/common/src/main/java/org/apache/qpid/configuration/QpidProperty.java b/java/common/src/main/java/org/apache/qpid/configuration/QpidProperty.java
index 9c0aaaec89..e88c7784a2 100644
--- a/java/common/src/main/java/org/apache/qpid/configuration/QpidProperty.java
+++ b/java/common/src/main/java/org/apache/qpid/configuration/QpidProperty.java
@@ -24,7 +24,7 @@ abstract class QpidProperty<T>
{
private T defValue;
private String[] names;
- protected Accessor accessor;
+ private Accessor accessor;
QpidProperty(T defValue, String... names)
{
@@ -101,7 +101,12 @@ abstract class QpidProperty<T>
{
return new QpidStringProperty(accessor,defaultValue, names);
}
-
+
+ protected Accessor getAccessor()
+ {
+ return accessor;
+ }
+
static class QpidBooleanProperty extends QpidProperty<Boolean>
{
QpidBooleanProperty(Boolean defValue, String... names)
@@ -117,7 +122,7 @@ abstract class QpidProperty<T>
@Override
protected Boolean getByName(String name)
{
- return accessor.getBoolean(name);
+ return getAccessor().getBoolean(name);
}
}
@@ -136,7 +141,7 @@ abstract class QpidProperty<T>
@Override
protected Integer getByName(String name)
{
- return accessor.getInt(name);
+ return getAccessor().getInt(name);
}
}
@@ -155,7 +160,7 @@ abstract class QpidProperty<T>
@Override
protected Long getByName(String name)
{
- return accessor.getLong(name);
+ return getAccessor().getLong(name);
}
}
@@ -174,7 +179,7 @@ abstract class QpidProperty<T>
@Override
protected String getByName(String name)
{
- return accessor.getString(name);
+ return getAccessor().getString(name);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java b/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java
index 1989ade4ac..975ec4daca 100644
--- a/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java
+++ b/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java
@@ -36,6 +36,10 @@ import org.apache.qpid.framing.AMQShortString;
*/
public class ExchangeDefaults
{
+ private ExchangeDefaults()
+ {
+ }
+
/** The default direct exchange, which is a special internal exchange that cannot be explicitly bound to. */
public static final AMQShortString DEFAULT_EXCHANGE_NAME = new AMQShortString("<<default>>");
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
index f5fbbac42b..966a03605c 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
@@ -52,10 +52,6 @@ public interface AMQMethodBody extends AMQBody
public void writePayload(DataOutput buffer) throws IOException;
- //public abstract void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException;
-
- //public void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException;
-
public AMQFrame generateFrame(int channelId);
public String toString();
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
index 3951f17681..85870e68c5 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
@@ -321,7 +321,6 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt
{
final int size = length();
- //buffer.setAutoExpand(true);
buffer.writeByte(size);
buffer.write(_data, _offset, size);
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQTypeMap.java b/java/common/src/main/java/org/apache/qpid/framing/AMQTypeMap.java
index a07fd78c8c..94a7d127b3 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQTypeMap.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQTypeMap.java
@@ -27,6 +27,10 @@ public class AMQTypeMap
{
public static final Map<Byte, AMQType> _reverseTypeMap = new HashMap<Byte, AMQType>();
+ private AMQTypeMap()
+ {
+ }
+
static
{
for(AMQType type : AMQType.values())
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
index f942d33389..a6883917d5 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
@@ -294,85 +294,83 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti
private void decode(ByteArrayDataInput buffer) throws IOException, AMQFrameDecodingException
{
- // ByteBuffer buffer = ByteBuffer.wrap(_encodedForm);
+ int headersOffset = 0;
- int headersOffset = 0;
-
- if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0)
- {
- _contentType = buffer.readAMQShortString();
- headersOffset += EncodingUtils.encodedShortStringLength(_contentType);
- }
+ if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0)
+ {
+ _contentType = buffer.readAMQShortString();
+ headersOffset += EncodingUtils.encodedShortStringLength(_contentType);
+ }
- if ((_propertyFlags & ENCODING_MASK) != 0)
- {
- _encoding = buffer.readAMQShortString();
- headersOffset += EncodingUtils.encodedShortStringLength(_encoding);
- }
+ if ((_propertyFlags & ENCODING_MASK) != 0)
+ {
+ _encoding = buffer.readAMQShortString();
+ headersOffset += EncodingUtils.encodedShortStringLength(_encoding);
+ }
- if ((_propertyFlags & HEADERS_MASK) != 0)
- {
- long length = EncodingUtils.readUnsignedInteger(buffer);
+ if ((_propertyFlags & HEADERS_MASK) != 0)
+ {
+ long length = EncodingUtils.readUnsignedInteger(buffer);
- _headers = new FieldTable(_encodedForm, headersOffset+4, (int)length);
+ _headers = new FieldTable(_encodedForm, headersOffset+4, (int)length);
- buffer.skipBytes((int)length);
- }
+ buffer.skipBytes((int)length);
+ }
- if ((_propertyFlags & DELIVERY_MODE_MASK) != 0)
- {
- _deliveryMode = buffer.readByte();
- }
+ if ((_propertyFlags & DELIVERY_MODE_MASK) != 0)
+ {
+ _deliveryMode = buffer.readByte();
+ }
- if ((_propertyFlags & PRIORITY_MASK) != 0)
- {
- _priority = buffer.readByte();
- }
+ if ((_propertyFlags & PRIORITY_MASK) != 0)
+ {
+ _priority = buffer.readByte();
+ }
- if ((_propertyFlags & CORRELATION_ID_MASK) != 0)
- {
- _correlationId = buffer.readAMQShortString();
- }
+ if ((_propertyFlags & CORRELATION_ID_MASK) != 0)
+ {
+ _correlationId = buffer.readAMQShortString();
+ }
- if ((_propertyFlags & REPLY_TO_MASK) != 0)
- {
- _replyTo = buffer.readAMQShortString();
- }
+ if ((_propertyFlags & REPLY_TO_MASK) != 0)
+ {
+ _replyTo = buffer.readAMQShortString();
+ }
- if ((_propertyFlags & EXPIRATION_MASK) != 0)
- {
- _expiration = EncodingUtils.readLongAsShortString(buffer);
- }
+ if ((_propertyFlags & EXPIRATION_MASK) != 0)
+ {
+ _expiration = EncodingUtils.readLongAsShortString(buffer);
+ }
- if ((_propertyFlags & MESSAGE_ID_MASK) != 0)
- {
- _messageId = buffer.readAMQShortString();
- }
+ if ((_propertyFlags & MESSAGE_ID_MASK) != 0)
+ {
+ _messageId = buffer.readAMQShortString();
+ }
- if ((_propertyFlags & TIMESTAMP_MASK) != 0)
- {
- _timestamp = EncodingUtils.readTimestamp(buffer);
- }
+ if ((_propertyFlags & TIMESTAMP_MASK) != 0)
+ {
+ _timestamp = EncodingUtils.readTimestamp(buffer);
+ }
- if ((_propertyFlags & TYPE_MASK) != 0)
- {
- _type = buffer.readAMQShortString();
- }
+ if ((_propertyFlags & TYPE_MASK) != 0)
+ {
+ _type = buffer.readAMQShortString();
+ }
- if ((_propertyFlags & USER_ID_MASK) != 0)
- {
- _userId = buffer.readAMQShortString();
- }
+ if ((_propertyFlags & USER_ID_MASK) != 0)
+ {
+ _userId = buffer.readAMQShortString();
+ }
- if ((_propertyFlags & APPLICATION_ID_MASK) != 0)
- {
- _appId = buffer.readAMQShortString();
- }
+ if ((_propertyFlags & APPLICATION_ID_MASK) != 0)
+ {
+ _appId = buffer.readAMQShortString();
+ }
- if ((_propertyFlags & CLUSTER_ID_MASK) != 0)
- {
- _clusterId = buffer.readAMQShortString();
- }
+ if ((_propertyFlags & CLUSTER_ID_MASK) != 0)
+ {
+ _clusterId = buffer.readAMQShortString();
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java b/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
index 538e0e1327..6d05c2fd3a 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
@@ -41,6 +41,10 @@ public class EncodingUtils
public static final int SIZEOF_UNSIGNED_INT = 4;
private static final boolean[] ALL_FALSE_ARRAY = new boolean[8];
+ private EncodingUtils()
+ {
+ }
+
public static int encodedShortStringLength(String s)
{
if (s == null)
@@ -115,7 +119,7 @@ public class EncodingUtils
{
return len + 6 + encodedShortStringLength((short) (i / 1000000));
}
- else // if (i > 99999)
+ else // if i > 99999
{
return len + 5 + encodedShortStringLength((short) (i / 100000));
}
@@ -737,8 +741,6 @@ public class EncodingUtils
public static long readTimestamp(DataInput buffer) throws IOException
{
- // Discard msb from AMQ timestamp
- // buffer.getUnsignedInt();
return buffer.readLong();
}
@@ -803,8 +805,6 @@ public class EncodingUtils
byte[] from = new byte[size];
- // Is this not the same.
- // bb.get(from, 0, length);
for (int i = 0; i < size; i++)
{
from[i] = bb.get(i);
@@ -959,7 +959,6 @@ public class EncodingUtils
else
{
// really writing out unsigned byte
- //buffer.put((byte) 0);
writeUnsignedInteger(buffer, 0L);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java
index b2b78ebcef..a2d4d27396 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java
@@ -25,6 +25,10 @@ import java.io.IOException;
public class FieldTableFactory
{
+ private FieldTableFactory()
+ {
+ }
+
public static FieldTable newFieldTable()
{
return new FieldTable();
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
index 3c1e67f488..0a06f0f1e9 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
@@ -43,9 +43,6 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
private final byte _protocolMajor;
private final byte _protocolMinor;
-
-// public ProtocolInitiation() {}
-
public ProtocolInitiation(byte[] protocolHeader, byte protocolClass, byte protocolInstance, byte protocolMajor, byte protocolMinor)
{
_protocolHeader = protocolHeader;
diff --git a/java/common/src/main/java/org/apache/qpid/messaging/util/JAddr.java b/java/common/src/main/java/org/apache/qpid/messaging/util/JAddr.java
index 225f7c11e9..c5c2cacd1a 100644
--- a/java/common/src/main/java/org/apache/qpid/messaging/util/JAddr.java
+++ b/java/common/src/main/java/org/apache/qpid/messaging/util/JAddr.java
@@ -35,6 +35,9 @@ import java.util.List;
public class JAddr
{
+ private JAddr()
+ {
+ }
public static final void main(String[] args) throws Exception
{
diff --git a/java/common/src/main/java/org/apache/qpid/messaging/util/PyPrint.java b/java/common/src/main/java/org/apache/qpid/messaging/util/PyPrint.java
index ef6c724371..2681893482 100644
--- a/java/common/src/main/java/org/apache/qpid/messaging/util/PyPrint.java
+++ b/java/common/src/main/java/org/apache/qpid/messaging/util/PyPrint.java
@@ -33,6 +33,9 @@ import java.util.Map;
public class PyPrint
{
+ private PyPrint()
+ {
+ }
public static String pprint(Object obj)
{
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
index 260c17f193..185c01d3df 100644
--- a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
+++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
@@ -51,7 +51,6 @@ public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, Proto
*
* @return The method registry for a specific version of the AMQP.
*/
-// public VersionSpecificRegistry getRegistry();
MethodRegistry getMethodRegistry();
diff --git a/java/common/src/main/java/org/apache/qpid/thread/Threading.java b/java/common/src/main/java/org/apache/qpid/thread/Threading.java
index 603e8a7441..265b336157 100644
--- a/java/common/src/main/java/org/apache/qpid/thread/Threading.java
+++ b/java/common/src/main/java/org/apache/qpid/thread/Threading.java
@@ -24,7 +24,11 @@ package org.apache.qpid.thread;
public final class Threading
{
private static ThreadFactory threadFactory;
-
+
+ private Threading()
+ {
+ }
+
static {
try
{
diff --git a/java/common/src/main/java/org/apache/qpid/transport/RangeSetFactory.java b/java/common/src/main/java/org/apache/qpid/transport/RangeSetFactory.java
index 0f19d7e2b2..0f049aba8e 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/RangeSetFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/RangeSetFactory.java
@@ -22,6 +22,10 @@ package org.apache.qpid.transport;
public class RangeSetFactory
{
+ private RangeSetFactory()
+ {
+ }
+
public static RangeSet createRangeSet()
{
return new RangeSetImpl();
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java
index 6784bc2279..d450746eaa 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -410,7 +410,6 @@ public class Session extends SessionInvoker
log.debug("ID: [%s] %s", this.channel, id);
}
- //if ((id % 65536) == 0)
if ((id & 0xff) == 0)
{
flushProcessed(TIMELY_REPLY);
diff --git a/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java b/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java
index 8ac8093645..27fce6e167 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java
@@ -361,13 +361,13 @@ abstract class AbstractDecoder implements Decoder
private long readSize(Type t)
{
- if (t.fixed)
+ if (t.isFixed())
{
- return t.width;
+ return t.getWidth();
}
else
{
- return readSize(t.width);
+ return readSize(t.getWidth());
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java b/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java
index 5e0207c979..a38c83d4cb 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java
@@ -359,7 +359,7 @@ abstract class AbstractEncoder implements Encoder
Object value = entry.getValue();
Type type = encoding(value);
writeStr8(key);
- put(type.code);
+ put(type.getCode());
write(type, value);
}
}
@@ -380,7 +380,7 @@ abstract class AbstractEncoder implements Encoder
for (Object value : list)
{
Type type = encoding(value);
- put(type.code);
+ put(type.getCode());
write(type, value);
}
}
@@ -408,7 +408,7 @@ abstract class AbstractEncoder implements Encoder
type = encoding(array.get(0));
}
- put(type.code);
+ put(type.getCode());
writeUint32(array.size());
@@ -420,18 +420,18 @@ abstract class AbstractEncoder implements Encoder
private void writeSize(Type t, int size)
{
- if (t.fixed)
+ if (t.isFixed())
{
- if (size != t.width)
+ if (size != t.getWidth())
{
throw new IllegalArgumentException
- ("size does not match fixed width " + t.width + ": " +
+ ("size does not match fixed width " + t.getWidth() + ": " +
size);
}
}
else
{
- writeSize(t.width, size);
+ writeSize(t.getWidth(), size);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java b/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
index 6a6a2b75d1..55ba95ad75 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
@@ -54,6 +54,10 @@ public class Transport
OUTGOING_PROTOCOL_TO_IMPLDEFAULTS_MAP = Collections.unmodifiableMap(map);
}
+ private Transport()
+ {
+ }
+
public static IncomingNetworkTransport getIncomingTransportInstance()
{
return (IncomingNetworkTransport) loadTransportClass(
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java
index 6388762026..d51491862b 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java
@@ -37,6 +37,10 @@ import java.nio.ByteBuffer;
public class SecurityLayerFactory
{
+ private SecurityLayerFactory()
+ {
+ }
+
public static SecurityLayer newInstance(ConnectionSettings settings)
{
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java
index 6c615c05b6..625e1a77c2 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java
@@ -30,10 +30,10 @@ import javax.security.sasl.SaslClient;
public abstract class SASLEncryptor implements ConnectionListener
{
- protected SaslClient saslClient;
- protected boolean securityLayerEstablished = false;
- protected int sendBuffSize;
- protected int recvBuffSize;
+ private SaslClient saslClient;
+ private boolean securityLayerEstablished = false;
+ private int sendBuffSize;
+ private int recvBuffSize;
public boolean isSecurityLayerEstablished()
{
@@ -61,4 +61,19 @@ public abstract class SASLEncryptor implements ConnectionListener
public void closed(Connection conn) {}
public abstract void securityLayerEstablished();
+
+ public SaslClient getSaslClient()
+ {
+ return saslClient;
+ }
+
+ public int getSendBuffSize()
+ {
+ return sendBuffSize;
+ }
+
+ public int getRecvBuffSize()
+ {
+ return recvBuffSize;
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java
index a9321fbec1..a100b96412 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java
@@ -56,11 +56,11 @@ public class SASLReceiver extends SASLEncryptor implements Receiver<ByteBuffer>
{
while (buf.hasRemaining())
{
- int length = Math.min(buf.remaining(),recvBuffSize);
+ int length = Math.min(buf.remaining(), getRecvBuffSize());
buf.get(netData, 0, length);
try
{
- byte[] out = saslClient.unwrap(netData, 0, length);
+ byte[] out = getSaslClient().unwrap(netData, 0, length);
delegate.received(ByteBuffer.wrap(out));
}
catch (SaslException e)
@@ -77,7 +77,7 @@ public class SASLReceiver extends SASLEncryptor implements Receiver<ByteBuffer>
public void securityLayerEstablished()
{
- netData = new byte[recvBuffSize];
+ netData = new byte[getRecvBuffSize()];
log.debug("SASL Security Layer Established");
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
index ce80f32eb8..61d54a8386 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
@@ -31,7 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> {
- protected Sender<ByteBuffer> delegate;
+ private Sender<ByteBuffer> delegate;
private byte[] appData;
private final AtomicBoolean closed = new AtomicBoolean(false);
private static final Logger log = Logger.get(SASLSender.class);
@@ -52,7 +52,7 @@ public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> {
{
try
{
- saslClient.dispose();
+ getSaslClient().dispose();
}
catch (SaslException e)
{
@@ -78,14 +78,14 @@ public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> {
{
while (buf.hasRemaining())
{
- int length = Math.min(buf.remaining(),sendBuffSize);
- log.debug("sendBuffSize %s", sendBuffSize);
+ int length = Math.min(buf.remaining(), getSendBuffSize());
+ log.debug("sendBuffSize %s", getSendBuffSize());
log.debug("buf.remaining() %s", buf.remaining());
buf.get(appData, 0, length);
try
{
- byte[] out = saslClient.wrap(appData, 0, length);
+ byte[] out = getSaslClient().wrap(appData, 0, length);
log.debug("out.length %s", out.length);
delegate.send(ByteBuffer.wrap(out));
@@ -110,7 +110,7 @@ public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> {
public void securityLayerEstablished()
{
- appData = new byte[sendBuffSize];
+ appData = new byte[getSendBuffSize()];
log.debug("SASL Security Layer Established");
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java
index 8f01719935..71a73db71f 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java
@@ -38,7 +38,11 @@ import java.security.cert.X509Certificate;
public class SSLUtil
{
private static final Logger log = Logger.get(SSLUtil.class);
-
+
+ private SSLUtil()
+ {
+ }
+
public static void verifyHostname(SSLEngine engine,String hostnameExpected)
{
try
diff --git a/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java b/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java
index d76d0e249f..bd3e9bbcbc 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java
@@ -31,8 +31,11 @@ import static java.lang.Math.min;
* @author Rafael H. Schloming
*/
-public class Functions
+public final class Functions
{
+ private Functions()
+ {
+ }
public static final int mod(int n, int m)
{
diff --git a/java/common/src/main/java/org/apache/qpid/url/BindingURLParser.java b/java/common/src/main/java/org/apache/qpid/url/BindingURLParser.java
index 6da468d1b8..fe7b01761b 100644
--- a/java/common/src/main/java/org/apache/qpid/url/BindingURLParser.java
+++ b/java/common/src/main/java/org/apache/qpid/url/BindingURLParser.java
@@ -184,7 +184,7 @@ public class BindingURLParser
char nextChar = _url[_index];
// check for the following special cases.
- // "myQueue?durable='true'" or just "myQueue";
+ // "myQueue?durable='true'" or just "myQueue"
StringBuilder builder = new StringBuilder();
while (nextChar != COLON_CHAR && nextChar != QUESTION_MARK_CHAR && nextChar != END_OF_URL_MARKER_CHAR)
diff --git a/java/common/src/main/java/org/apache/qpid/url/URLHelper.java b/java/common/src/main/java/org/apache/qpid/url/URLHelper.java
index 94d97d3023..8516e7fa0e 100644
--- a/java/common/src/main/java/org/apache/qpid/url/URLHelper.java
+++ b/java/common/src/main/java/org/apache/qpid/url/URLHelper.java
@@ -28,6 +28,10 @@ public class URLHelper
public static final char ALTERNATIVE_OPTION_SEPARATOR = ',';
public static final char BROKER_SEPARATOR = ';';
+ private URLHelper()
+ {
+ }
+
public static void parseOptions(Map<String, String> optionMap, String options) throws URLSyntaxException
{
if ((options == null) || (options.indexOf('=') == -1))
diff --git a/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java b/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java
index e4cebb6926..3d17bbf6ea 100644
--- a/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java
+++ b/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java
@@ -329,9 +329,6 @@ public class CommandLineParser
expectingArgs = true;
optionExpectingArgs = matchedOption;
- // In the mean time set this options argument to the empty string in case no argument is ever
- // supplied.
- // options.put(matchedOption, "");
}
// Check if the option was matched on its own and is a flag in which case set that flag.
diff --git a/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java b/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java
deleted file mode 100644
index 0f9bd64233..0000000000
--- a/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-package org.apache.qpid.util;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class ConcurrentLinkedMessageQueueAtomicSize<E> extends ConcurrentLinkedQueueAtomicSize<E> implements MessageQueue<E>
-{
- private static final Logger _logger = LoggerFactory.getLogger(ConcurrentLinkedMessageQueueAtomicSize.class);
-
- protected Queue<E> _messageHead = new ConcurrentLinkedQueueAtomicSize<E>();
-
- protected AtomicInteger _messageHeadSize = new AtomicInteger(0);
-
- @Override
- public int size()
- {
- return super.size() + _messageHeadSize.get();
- }
-
- public int headSize()
- {
- return _messageHeadSize.get();
- }
-
- @Override
- public E poll()
- {
- if (_messageHead.isEmpty())
- {
- return super.poll();
- }
- else
- {
- E e = _messageHead.poll();
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Providing item(" + e + ")from message head");
- }
-
- if (e != null)
- {
- _messageHeadSize.decrementAndGet();
- }
-
- return e;
- }
- }
-
- @Override
- public boolean remove(Object o)
- {
-
- if (_messageHead.isEmpty())
- {
- return super.remove(o);
- }
- else
- {
- if (_messageHead.remove(o))
- {
- _messageHeadSize.decrementAndGet();
-
- return true;
- }
-
- return super.remove(o);
- }
- }
-
- @Override
- public boolean removeAll(Collection<?> c)
- {
- if (_messageHead.isEmpty())
- {
- return super.removeAll(c);
- }
- else
- {
- // fixme this is super.removeAll but iterator here doesn't work
- // we need to be able to correctly decrement _messageHeadSize
- // boolean modified = false;
- // Iterator<?> e = iterator();
- // while (e.hasNext())
- // {
- // if (c.contains(e.next()))
- // {
- // e.remove();
- // modified = true;
- // _size.decrementAndGet();
- // }
- // }
- // return modified;
-
- throw new RuntimeException("Not implemented");
- }
- }
-
- @Override
- public boolean isEmpty()
- {
- return (_messageHead.isEmpty() && super.isEmpty());
- }
-
- @Override
- public void clear()
- {
- super.clear();
- _messageHead.clear();
- }
-
- @Override
- public boolean contains(Object o)
- {
- return _messageHead.contains(o) || super.contains(o);
- }
-
- @Override
- public boolean containsAll(Collection<?> o)
- {
- return _messageHead.containsAll(o) || super.containsAll(o);
- }
-
- @Override
- public E element()
- {
- if (_messageHead.isEmpty())
- {
- return super.element();
- }
- else
- {
- return _messageHead.element();
- }
- }
-
- @Override
- public E peek()
- {
- if (_messageHead.isEmpty())
- {
- return super.peek();
- }
- else
- {
- E o = _messageHead.peek();
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Peeking item (" + o + ") from message head");
- }
-
- return o;
- }
-
- }
-
- @Override
- public Iterator<E> iterator()
- {
- final Iterator<E> mainMessageIterator = super.iterator();
-
- return new Iterator<E>()
- {
- private final Iterator<E> _headIterator = _messageHead.iterator();
- private final Iterator<E> _mainIterator = mainMessageIterator;
-
- private Iterator<E> last;
-
- public boolean hasNext()
- {
- return _headIterator.hasNext() || _mainIterator.hasNext();
- }
-
- public E next()
- {
- if (_headIterator.hasNext())
- {
- last = _headIterator;
-
- return _headIterator.next();
- }
- else
- {
- last = _mainIterator;
-
- return _mainIterator.next();
- }
- }
-
- public void remove()
- {
- last.remove();
- if(last == _mainIterator)
- {
- decrementSize();
- }
- else
- {
- _messageHeadSize.decrementAndGet();
- }
- }
- };
- }
-
- @Override
- public boolean retainAll(Collection<?> c)
- {
- throw new RuntimeException("Not Implemented");
- }
-
- @Override
- public Object[] toArray()
- {
- throw new RuntimeException("Not Implemented");
- }
-
- public boolean pushHead(E o)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Adding item(" + o + ") to head of queue");
- }
-
- if (_messageHead.offer(o))
- {
- _messageHeadSize.incrementAndGet();
-
- return true;
- }
-
- return false;
- }
-}
diff --git a/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueAtomicSize.java b/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueAtomicSize.java
deleted file mode 100644
index dd91a067b5..0000000000
--- a/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueAtomicSize.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.util;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class ConcurrentLinkedQueueAtomicSize<E> extends ConcurrentLinkedQueue<E>
-{
- private AtomicInteger _size = new AtomicInteger(0);
-
- public int size()
- {
- return _size.get();
- }
-
- protected final void decrementSize()
- {
- _size.decrementAndGet();
- }
-
-
-
- public boolean offer(E o)
- {
-
- if (super.offer(o))
- {
- _size.incrementAndGet();
- return true;
- }
-
- return false;
- }
-
- public E poll()
- {
- E e = super.poll();
-
- if (e != null)
- {
- _size.decrementAndGet();
- }
-
- return e;
- }
-
- @Override
- public boolean remove(Object o)
- {
- if (super.remove(o))
- {
- _size.decrementAndGet();
- return true;
- }
-
- return false;
- }
-}
diff --git a/java/common/src/main/java/org/apache/qpid/util/FileUtils.java b/java/common/src/main/java/org/apache/qpid/util/FileUtils.java
index ac8e3da3c2..2d3e321812 100644
--- a/java/common/src/main/java/org/apache/qpid/util/FileUtils.java
+++ b/java/common/src/main/java/org/apache/qpid/util/FileUtils.java
@@ -46,6 +46,10 @@ import java.util.List;
*/
public class FileUtils
{
+ private FileUtils()
+ {
+ }
+
/**
* Reads a text file as a string.
*
diff --git a/java/common/src/main/java/org/apache/qpid/util/PrettyPrintingUtils.java b/java/common/src/main/java/org/apache/qpid/util/PrettyPrintingUtils.java
deleted file mode 100644
index 93266f2486..0000000000
--- a/java/common/src/main/java/org/apache/qpid/util/PrettyPrintingUtils.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.util;
-
-/**
- * Contains pretty printing convenienve methods for producing formatted logging output, mostly for debugging purposes.
- *
- * <p><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * </table>
- *
- * @todo Drop this. There are already array pretty printing methods it java.utils.Arrays.
- */
-public class PrettyPrintingUtils
-{
- /**
- * Pretty prints an array of ints as a string.
- *
- * @param array The array to pretty print.
- *
- * @return The pretty printed string.
- */
- public static String printArray(int[] array)
- {
- StringBuilder result = new StringBuilder("[");
- for (int i = 0; i < array.length; i++)
- {
- result.append(array[i])
- .append((i < (array.length - 1)) ? ", " : "");
- }
-
- result.append(']');
-
- return result.toString();
- }
-
- /**
- * Pretty prints an array of strings as a string.
- *
- * @param array The array to pretty print.
- *
- * @return The pretty printed string.
- */
- public static String printArray(String[] array)
- {
- String result = "[";
- for (int i = 0; i < array.length; i++)
- {
- result += array[i];
- result += (i < (array.length - 1)) ? ", " : "";
- }
-
- result += "]";
-
- return result;
- }
-}
diff --git a/java/common/src/main/java/org/apache/qpid/util/Serial.java b/java/common/src/main/java/org/apache/qpid/util/Serial.java
index 7961e4ba3c..451d5d60eb 100644
--- a/java/common/src/main/java/org/apache/qpid/util/Serial.java
+++ b/java/common/src/main/java/org/apache/qpid/util/Serial.java
@@ -30,6 +30,9 @@ import java.util.Comparator;
public class Serial
{
+ private Serial()
+ {
+ }
public static final Comparator<Integer> COMPARATOR = new Comparator<Integer>()
{
diff --git a/java/common/src/main/java/org/apache/qpid/util/Strings.java b/java/common/src/main/java/org/apache/qpid/util/Strings.java
index da3aab3192..f2d51ccfde 100644
--- a/java/common/src/main/java/org/apache/qpid/util/Strings.java
+++ b/java/common/src/main/java/org/apache/qpid/util/Strings.java
@@ -36,6 +36,9 @@ import java.util.regex.Pattern;
public final class Strings
{
+ private Strings()
+ {
+ }
private static final byte[] EMPTY = new byte[0];
diff --git a/java/common/src/main/java/org/apache/qpid/util/UUIDs.java b/java/common/src/main/java/org/apache/qpid/util/UUIDs.java
index 4bf6b7f0a2..d9b2dd8413 100644
--- a/java/common/src/main/java/org/apache/qpid/util/UUIDs.java
+++ b/java/common/src/main/java/org/apache/qpid/util/UUIDs.java
@@ -28,6 +28,9 @@ package org.apache.qpid.util;
public final class UUIDs
{
+ private UUIDs()
+ {
+ }
public static final UUIDGen newGenerator()
{
diff --git a/java/common/src/main/java/org/apache/qpid/util/concurrent/AlreadyUnblockedException.java b/java/common/src/main/java/org/apache/qpid/util/concurrent/AlreadyUnblockedException.java
deleted file mode 100644
index e0c0337898..0000000000
--- a/java/common/src/main/java/org/apache/qpid/util/concurrent/AlreadyUnblockedException.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package org.apache.qpid.util.concurrent;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-/**
- * Used to signal that a data element and its producer cannot be requeued or sent an error message when using a
- * {@link BatchSynchQueue} because the producer has already been unblocked by an unblocking take on the queue.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Signal that an unblocking take has already occurred.
- * </table>
- */
-public class AlreadyUnblockedException extends RuntimeException
-{ }
diff --git a/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueue.java b/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueue.java
deleted file mode 100644
index 63d8f77edb..0000000000
--- a/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueue.java
+++ /dev/null
@@ -1,122 +0,0 @@
-package org.apache.qpid.util.concurrent;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.util.Collection;
-import java.util.concurrent.BlockingQueue;
-
-/**
- * BatchSynchQueue is an abstraction of the classic producer/consumer buffer pattern for thread interaction. In this
- * pattern threads can deposit data onto a buffer whilst other threads take data from the buffer and perform usefull
- * work with it. A BatchSynchQueue adds to this the possibility that producers can be blocked until their data is
- * consumed or until a consumer chooses to release the producer some time after consuming the data from the queue.
- *
- * <p>There are a number of possible advantages to using this technique when compared with having the producers
- * processing their own data:
- *
- * <ul>
- * <li>Data may be deposited asynchronously in the buffer allowing the producers to continue running.</li>
- * <li>Data may be deposited synchronously in the buffer so that producers wait until their data has been processed
- * before being allowed to continue.</li>
- * <li>Variable rates of production/consumption can be smoothed over by the buffer as it provides space in memory to
- * hold data between production and consumption.</li>
- * <li>Consumers may be able to batch data as they consume it leading to more efficient consumption over
- * individual data item consumption where latency associated with the consume operation can be ammortized.
- * For example, it may be possibly to ammortize the cost of a disk seek over many producers.</li>
- * <li>Data from seperate threads can be combined together in the buffer, providing a convenient way of spreading work
- * amongst many workers and gathering the results together again.</li>
- * <li>Different types of queue can be used to hold the buffer, resulting in different processing orders. For example,
- * lifo, fifo, priority heap, etc.</li>
- * </ul>
- *
- * <p/>The asynchronous type of producer/consumer buffers is already well supported by the java.util.concurrent package
- * (in Java 5) and there is also a synchronous queue implementation available there too. This interface extends the
- * blocking queue with some more methods for controlling a synchronous blocking queue. In particular it adds additional
- * take methods that can be used to take data from a queue without releasing producers, so that consumers have an
- * opportunity to confirm correct processing of the data before producers are released. It also adds a put method with
- * exceptions so that consumers can signal exception cases back to producers where there are errors in the data.
- *
- * <p/>This type of queue is usefull in situations where consumers can obtain an efficiency gain by batching data
- * from many threads but where synchronous handling of that data is neccessary because producers need to know that
- * their data has been processed before they continue. For example, sending a bundle of messages together, or writing
- * many records to disk at once, may result in improved performance but the originators of the messages or disk records
- * need confirmation that their data has really been sent or saved to disk.
- *
- * <p/>The consumer can put an element back onto the queue or send an error message to the elements producer using the
- * {@link SynchRecord} interface.
- *
- * <p/>The {@link #take()}, {@link #drainTo(java.util.Collection<? super E>)} and
- * {@link #drainTo(java.util.Collection<? super E>, int)} methods from {@link BlockingQueue} should behave as if they
- * have been called with unblock set to false. That is they take elements from the queue but leave the producers
- * blocked. These methods do not return collections of {@link SynchRecord}s so they do not supply an interface through
- * which errors or re-queuings can be applied. If these methods are used then the consumer must succesfully process
- * all the records it takes.
- *
- * <p/>The {@link #put} method should silently swallow any exceptions that consumers attempt to return to the caller.
- * In order to handle exceptions the {@link #tryPut} method must be used.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Handle synchronous puts, with possible exceptions.
- * <tr><td> Allow consumers to take many records from a queue in a batch.
- * <tr><td> Allow consumers to decide when to unblock synchronous producers.
- * </table>
- */
-public interface BatchSynchQueue<E> extends BlockingQueue<E>
-{
- /**
- * Tries a synchronous put into the queue. If a consumer encounters an exception condition whilst processing the
- * data that is put, then this is returned to the caller wrapped inside a {@link SynchException}.
- *
- * @param e The data element to put into the queue.
- *
- * @throws InterruptedException If the thread is interrupted whilst waiting to write to the queue or whilst waiting
- * on its entry in the queue being consumed.
- * @throws SynchException If a consumer encounters an error whilst processing the data element.
- */
- public void tryPut(E e) throws InterruptedException, SynchException;
-
- /**
- * Takes all available data items from the queue or blocks until some become available. The returned items
- * are wrapped in a {@link SynchRecord} which provides an interface to requeue them or send errors to their
- * producers, where the producers are still blocked.
- *
- * @param c The collection to drain the data items into.
- * @param unblock If set to <tt>true</tt> the producers for the taken items will be immediately unblocked.
- *
- * @return A count of the number of elements that were drained from the queue.
- */
- public SynchRef drainTo(Collection<SynchRecord<E>> c, boolean unblock);
-
- /**
- * Takes up to maxElements available data items from the queue or blocks until some become available. The returned
- * items are wrapped in a {@link SynchRecord} which provides an interface to requeue them or send errors to their
- * producers, where the producers are still blocked.
- *
- * @param c The collection to drain the data items into.
- * @param maxElements The maximum number of elements to drain.
- * @param unblock If set to <tt>true</tt> the producers for the taken items will be immediately unblocked.
- *
- * @return A count of the number of elements that were drained from the queue.
- */
- public SynchRef drainTo(Collection<SynchRecord<E>> c, int maxElements, boolean unblock);
-}
diff --git a/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java b/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java
deleted file mode 100644
index 1d5f3b5e46..0000000000
--- a/java/common/src/main/java/org/apache/qpid/util/concurrent/BatchSynchQueueBase.java
+++ /dev/null
@@ -1,837 +0,0 @@
-package org.apache.qpid.util.concurrent;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.AbstractQueue;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Queue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-
-/**
- * Synchronous/Asynchronous puts. Asynchronous is easiest, just wait till can write to queue and deposit data.
- * Synchronous is harder. Deposit data, but then must wait until deposited element/elements are taken before being
- * allowed to unblock and continue. Consumer needs some options here too. Can just get the data from the buffer and
- * allow any producers unblocked as a result to continue, or can get data but continue blocking while the data is
- * processed before sending a message to do the unblocking. Synch/Asynch mode to be controlled by a switch.
- * Unblocking/not unblocking during consumer processing to be controlled by the consumers calls.
- *
- * <p/>Implementing sub-classes only need to supply an implementation of a queue to produce a valid concrete
- * implementation of this. This queue is only accessed through the methods {@link #insert}, {@link #extract},
- * {@link #getBufferCapacity()}, {@link #peekAtBufferHead()}. An implementation can override these methods to implement
- * the buffer other than by a queue, for example, by using an array.
- *
- * <p/>Normal queue methods to work asynchronously.
- * <p/>Put, take and drain methods from the BlockingQueue interface work synchronously but unblock producers immediately
- * when their data is taken.
- * <p/>The additional put, take and drain methods from the BatchSynchQueue interface work synchronously and provide the
- * option to keep producers blocked until the consumer decides to release them.
- *
- * <p/>Removed take method that keeps producers blocked as it is pointless. Essentially it reduces this class to
- * synchronous processing of individual data items, which negates the point of the hand-off design. The efficiency
- * gain of the hand off design comes in being able to batch consume requests, ammortizing latency (such as caused by io)
- * accross many producers. The only advantage of the single blocking take method is that it did take advantage of the
- * queue ordering, which ma be usefull, for example to apply a priority ordering amongst producers. This is also an
- * advantage over the java.util.concurrent.SynchronousQueue which doesn't have a backing queue which can be used to
- * apply orderings. If a single item take is really needed can just use the drainTo method with a maximum of one item.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * </table>
- */
-public abstract class BatchSynchQueueBase<E> extends AbstractQueue<E> implements BatchSynchQueue<E>
-{
- /** Used for logging. */
- private static final Logger log = LoggerFactory.getLogger(BatchSynchQueueBase.class);
-
- /** Holds a reference to the queue implementation that holds the buffer. */
- private Queue<SynchRecordImpl<E>> buffer;
-
- /** Holds the number of items in the queue */
- private int count;
-
- /** Main lock guarding all access */
- private ReentrantLock lock;
-
- /** Condition for waiting takes */
- private Condition notEmpty;
-
- /** Condition for waiting puts */
- private Condition notFull;
-
- /**
- * Creates a batch synch queue without fair thread scheduling.
- */
- public BatchSynchQueueBase()
- {
- this(false);
- }
-
- /**
- * Ensures that the underlying buffer implementation is created.
- *
- * @param fair <tt>true</tt> if fairness is to be applied to threads waiting to access the buffer.
- */
- public BatchSynchQueueBase(boolean fair)
- {
- buffer = this.createQueue();
-
- // Create the buffer lock with the fairness flag set accordingly.
- lock = new ReentrantLock(fair);
-
- // Create the non-empty and non-full condition monitors on the buffer lock.
- notEmpty = lock.newCondition();
- notFull = lock.newCondition();
- }
-
- /**
- * Returns an iterator over the elements contained in this collection.
- *
- * @return An iterator over the elements contained in this collection.
- */
- public Iterator<E> iterator()
- {
- throw new RuntimeException("Not implemented.");
- }
-
- /**
- * Returns the number of elements in this collection. If the collection contains more than
- * <tt>Integer.MAX_VALUE</tt> elements, returns <tt>Integer.MAX_VALUE</tt>.
- *
- * @return The number of elements in this collection.
- */
- public int size()
- {
- final ReentrantLock lock = this.lock;
- lock.lock();
-
- try
- {
- return count;
- }
- finally
- {
- lock.unlock();
- }
- }
-
- /**
- * Inserts the specified element into this queue, if possible. When using queues that may impose insertion
- * restrictions (for example capacity bounds), method <tt>offer</tt> is generally preferable to method
- * {@link java.util.Collection#add}, which can fail to insert an element only by throwing an exception.
- *
- * @param e The element to insert.
- *
- * @return <tt>true</tt> if it was possible to add the element to this queue, else <tt>false</tt>
- */
- public boolean offer(E e)
- {
- if (e == null)
- {
- throw new NullPointerException();
- }
-
- final ReentrantLock lock = this.lock;
- lock.lock();
-
- try
- {
- return insert(e, false);
- }
- finally
- {
- lock.unlock();
- }
- }
-
- /**
- * Inserts the specified element into this queue, waiting if necessary up to the specified wait time for space to
- * become available.
- *
- * @param e The element to add.
- * @param timeout How long to wait before giving up, in units of <tt>unit</tt>
- * @param unit A <tt>TimeUnit</tt> determining how to interpret the <tt>timeout</tt> parameter.
- *
- * @return <tt>true</tt> if successful, or <tt>false</tt> if the specified waiting time elapses before space is
- * available.
- *
- * @throws InterruptedException If interrupted while waiting.
- * @throws NullPointerException If the specified element is <tt>null</tt>.
- */
- public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException
- {
- if (e == null)
- {
- throw new NullPointerException();
- }
-
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
-
- long nanos = unit.toNanos(timeout);
-
- try
- {
- do
- {
- if (insert(e, false))
- {
- return true;
- }
-
- try
- {
- nanos = notFull.awaitNanos(nanos);
- }
- catch (InterruptedException ie)
- {
- notFull.signal(); // propagate to non-interrupted thread
- throw ie;
- }
- }
- while (nanos > 0);
-
- return false;
- }
- finally
- {
- lock.unlock();
- }
- }
-
- /**
- * Retrieves and removes the head of this queue, or <tt>null</tt> if this queue is empty.
- *
- * @return The head of this queue, or <tt>null</tt> if this queue is empty.
- */
- public E poll()
- {
- final ReentrantLock lock = this.lock;
-
- lock.lock();
- try
- {
- if (count == 0)
- {
- return null;
- }
-
- E x = extract(true, true).getElement();
-
- return x;
- }
- finally
- {
- lock.unlock();
- }
- }
-
- /**
- * Retrieves and removes the head of this queue, waiting if necessary up to the specified wait time if no elements
- * are present on this queue.
- *
- * @param timeout How long to wait before giving up, in units of <tt>unit</tt>.
- * @param unit A <tt>TimeUnit</tt> determining how to interpret the <tt>timeout</tt> parameter.
- *
- * @return The head of this queue, or <tt>null</tt> if the specified waiting time elapses before an element is present.
- *
- * @throws InterruptedException If interrupted while waiting.
- */
- public E poll(long timeout, TimeUnit unit) throws InterruptedException
- {
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try
- {
- long nanos = unit.toNanos(timeout);
-
- do
- {
- if (count != 0)
- {
- E x = extract(true, true).getElement();
-
- return x;
- }
-
- try
- {
- nanos = notEmpty.awaitNanos(nanos);
- }
- catch (InterruptedException ie)
- {
- notEmpty.signal(); // propagate to non-interrupted thread
- throw ie;
- }
- }
- while (nanos > 0);
-
- return null;
- }
- finally
- {
- lock.unlock();
- }
- }
-
- /**
- * Retrieves, but does not remove, the head of this queue, returning <tt>null</tt> if this queue is empty.
- *
- * @return The head of this queue, or <tt>null</tt> if this queue is empty.
- */
- public E peek()
- {
- final ReentrantLock lock = this.lock;
- lock.lock();
-
- try
- {
- return peekAtBufferHead();
- }
- finally
- {
- lock.unlock();
- }
- }
-
- /**
- * Returns the number of elements that this queue can ideally (in the absence of memory or resource constraints)
- * accept without blocking, or <tt>Integer.MAX_VALUE</tt> if there is no intrinsic limit.
- *
- * <p>Note that you <em>cannot</em> always tell if an attempt to <tt>add</tt> an element will succeed by
- * inspecting <tt>remainingCapacity</tt> because it may be the case that another thread is about to <tt>put</tt>
- * or <tt>take</tt> an element.
- *
- * @return The remaining capacity.
- */
- public int remainingCapacity()
- {
- final ReentrantLock lock = this.lock;
- lock.lock();
-
- try
- {
- return getBufferCapacity() - count;
- }
- finally
- {
- lock.unlock();
- }
- }
-
- /**
- * Adds the specified element to this queue, waiting if necessary for space to become available.
- *
- * <p/>This method delegated to {@link #tryPut} which can raise {@link SynchException}s. If any are raised
- * this method silently ignores them. Use the {@link #tryPut} method directly if you want to catch these
- * exceptions.
- *
- * @param e The element to add.
- *
- * @throws InterruptedException If interrupted while waiting.
- */
- public void put(E e) throws InterruptedException
- {
- try
- {
- tryPut(e);
- }
- catch (SynchException ex)
- {
- // This exception is deliberately ignored. See the method comment for information about this.
- }
- }
-
- /**
- * Tries a synchronous put into the queue. If a consumer encounters an exception condition whilst processing the
- * data that is put, then this is returned to the caller wrapped inside a {@link SynchException}.
- *
- * @param e The data element to put into the queue. Cannot be null.
- *
- * @throws InterruptedException If the thread is interrupted whilst waiting to write to the queue or whilst waiting
- * on its entry in the queue being consumed.
- * @throws SynchException If a consumer encounters an error whilst processing the data element.
- */
- public void tryPut(E e) throws InterruptedException, SynchException
- {
- if (e == null)
- {
- throw new NullPointerException();
- }
-
- // final Queue<E> items = this.buffer;
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
-
- try
- {
- while (count == getBufferCapacity())
- {
- // Release the lock and wait until the queue is not full.
- notFull.await();
- }
- }
- catch (InterruptedException ie)
- {
- notFull.signal(); // propagate to non-interrupted thread
- throw ie;
- }
-
- // There is room in the queue so insert must succeed. Insert into the queu, release the lock and block
- // the producer until its data is taken.
- insert(e, true);
- }
-
- /**
- * Retrieves and removes the head of this queue, waiting if no elements are present on this queue.
- * Any producer that has its data element taken by this call will be immediately unblocked. To keep the
- * producer blocked whilst taking just a single item, use the
- * {@link #drainTo(java.util.Collection<org.apache.qpid.util.concurrent.SynchRecord<E>>, int, boolean)}
- * method. There is no take method to do that because there is not usually any advantage in a synchronous hand
- * off design that consumes data one item at a time. It is normal to consume data in chunks to ammortize consumption
- * latencies accross many producers where possible.
- *
- * @return The head of this queue.
- *
- * @throws InterruptedException if interrupted while waiting.
- */
- public E take() throws InterruptedException
- {
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
-
- try
- {
- try
- {
- while (count == 0)
- {
- // Release the lock and wait until the queue becomes non-empty.
- notEmpty.await();
- }
- }
- catch (InterruptedException ie)
- {
- notEmpty.signal(); // propagate to non-interrupted thread
- throw ie;
- }
-
- // There is data in the queue so extraction must succeed. Notify any waiting threads that the queue is
- // not full, and unblock the producer that owns the data item that is taken.
- E x = extract(true, true).getElement();
-
- return x;
- }
- finally
- {
- lock.unlock();
- }
- }
-
- /**
- * Removes all available elements from this queue and adds them into the given collection. This operation may be
- * more efficient than repeatedly polling this queue. A failure encountered while attempting to <tt>add</tt> elements
- * to collection <tt>c</tt> may result in elements being in neither, either or both collections when the associated
- * exception is thrown. Attempts to drain a queue to itself result in <tt>IllegalArgumentException</tt>. Further,
- * the behavior of this operation is undefined if the specified collection is modified while the operation is in
- * progress.
- *
- * @param objects The collection to transfer elements into.
- *
- * @return The number of elements transferred.
- *
- * @throws NullPointerException If objects is null.
- * @throws IllegalArgumentException If objects is this queue.
- */
- public int drainTo(Collection<? super E> objects)
- {
- return drainTo(objects, -1);
- }
-
- /**
- * Removes at most the given number of available elements from this queue and adds them into the given collection.
- * A failure encountered while attempting to <tt>add</tt> elements to collection <tt>c</tt> may result in elements
- * being in neither, either or both collections when the associated exception is thrown. Attempts to drain a queue
- * to itself result in <tt>IllegalArgumentException</tt>. Further, the behavior of this operation is undefined if
- * the specified collection is modified while the operation is in progress.
- *
- * @param objects The collection to transfer elements into.
- * @param maxElements The maximum number of elements to transfer. If this is -1 then that is interpreted as meaning
- * all elements.
- *
- * @return The number of elements transferred.
- *
- * @throws NullPointerException If c is null.
- * @throws IllegalArgumentException If c is this queue.
- */
- public int drainTo(Collection<? super E> objects, int maxElements)
- {
- if (objects == null)
- {
- throw new NullPointerException();
- }
-
- if (objects == this)
- {
- throw new IllegalArgumentException();
- }
-
- // final Queue<E> items = this.buffer;
- final ReentrantLock lock = this.lock;
- lock.lock();
-
- try
- {
- int n = 0;
-
- for (int max = ((maxElements >= count) || (maxElements < 0)) ? count : maxElements; n < max; n++)
- {
- // Take items from the queue, do unblock the producers, but don't send not full signals yet.
- objects.add(extract(true, false).getElement());
- }
-
- if (n > 0)
- {
- // count -= n;
- notFull.signalAll();
- }
-
- return n;
- }
- finally
- {
- lock.unlock();
- }
- }
-
- /**
- * Takes all available data items from the queue or blocks until some become available. The returned items
- * are wrapped in a {@link SynchRecord} which provides an interface to requeue them or send errors to their
- * producers, where the producers are still blocked.
- *
- * @param c The collection to drain the data items into.
- * @param unblock If set to <tt>true</tt> the producers for the taken items will be immediately unblocked.
- *
- * @return A count of the number of elements that were drained from the queue.
- */
- public SynchRef drainTo(Collection<SynchRecord<E>> c, boolean unblock)
- {
- return drainTo(c, -1, unblock);
- }
-
- /**
- * Takes up to maxElements available data items from the queue or blocks until some become available. The returned
- * items are wrapped in a {@link SynchRecord} which provides an interface to requeue them or send errors to their
- * producers, where the producers are still blocked.
- *
- * @param coll The collection to drain the data items into.
- * @param maxElements The maximum number of elements to drain.
- * @param unblock If set to <tt>true</tt> the producers for the taken items will be immediately unblocked.
- *
- * @return A count of the number of elements that were drained from the queue.
- */
- public SynchRef drainTo(Collection<SynchRecord<E>> coll, int maxElements, boolean unblock)
- {
- if (coll == null)
- {
- throw new NullPointerException();
- }
-
- // final Queue<E> items = this.buffer;
- final ReentrantLock lock = this.lock;
- lock.lock();
-
- try
- {
- int n = 0;
-
- for (int max = ((maxElements >= count) || (maxElements < 0)) ? count : maxElements; n < max; n++)
- {
- // Extract the next record from the queue, don't signall the not full condition yet and release
- // producers depending on whether the caller wants to or not.
- coll.add(extract(false, unblock));
- }
-
- if (n > 0)
- {
- // count -= n;
- notFull.signalAll();
- }
-
- return new SynchRefImpl(n, coll);
- }
- finally
- {
- lock.unlock();
- }
- }
-
- /**
- * This abstract method should be overriden to return an empty queue. Different implementations of producer
- * consumer buffers can control the order in which data is accessed using different queue implementations.
- * This method allows the type of queue to be abstracted out of this class and to be supplied by concrete
- * implementations.
- *
- * @return An empty queue.
- */
- protected abstract <T> Queue<T> createQueue();
-
- /**
- * Insert element into the queue, then possibly signal that the queue is not empty and block the producer
- * on the element until permission to procede is given.
- *
- * <p/>If the producer is to be blocked then the lock must be released first, otherwise no other process
- * will be able to get access to the queue. Hence, unlock and block are always set together.
- *
- * <p/>Call only when holding the global lock.
- *
- * @param unlockAndBlock <tt>true</tt>If the global queue lock should be released and the producer should be blocked.
- *
- * @return <tt>true</tt> if the operation succeeded, <tt>false</tt> otherwise. If the result is <tt>true</tt> this
- * method may not return straight away, but only after the producer is unblocked by having its data
- * consumed if the unlockAndBlock flag is set. In the false case the method will return straight away, no
- * matter what value the unlockAndBlock flag has, leaving the global lock on.
- */
- protected boolean insert(E x, boolean unlockAndBlock)
- {
- // Create a new record for the data item.
- SynchRecordImpl<E> record = new SynchRecordImpl<E>(x);
-
- boolean result = buffer.offer(record);
-
- if (result)
- {
- count++;
-
- // Tell any waiting consumers that the queue is not empty.
- notEmpty.signal();
-
- if (unlockAndBlock)
- {
- // Allow other threads to read/write the queue.
- lock.unlock();
-
- // Wait until a consumer takes this data item.
- record.waitForConsumer();
- }
-
- return true;
- }
- else
- {
- return false;
- }
- }
-
- /**
- * Extract element at current take position, advance, and signal.
- *
- * <p/>Call only when holding lock.
- */
- protected SynchRecordImpl<E> extract(boolean unblock, boolean signal)
- {
- SynchRecordImpl<E> result = buffer.remove();
- count--;
-
- if (signal)
- {
- notFull.signal();
- }
-
- if (unblock)
- {
- result.releaseImmediately();
- }
-
- return result;
- }
-
- /**
- * Get the capacity of the buffer. If the buffer has no maximum capacity then Integer.MAX_VALUE is returned.
- *
- * <p/>Call only when holding lock.
- *
- * @return The maximum capacity of the buffer.
- */
- protected int getBufferCapacity()
- {
- if (buffer instanceof Capacity)
- {
- return ((Capacity) buffer).getCapacity();
- }
- else
- {
- return Integer.MAX_VALUE;
- }
- }
-
- /**
- * Return the head element from the buffer.
- *
- * <p/>Call only when holding lock.
- *
- * @return The head element from the buffer.
- */
- protected E peekAtBufferHead()
- {
- return buffer.peek().getElement();
- }
-
- public class SynchRefImpl implements SynchRef
- {
- /** Holds the number of synch records associated with this reference. */
- private int numRecords;
-
- /** Holds a reference to the collection of synch records managed by this. */
- private Collection<SynchRecord<E>> records;
-
- public SynchRefImpl(int n, Collection<SynchRecord<E>> records)
- {
- this.numRecords = n;
- this.records = records;
- }
-
- public int getNumRecords()
- {
- return numRecords;
- }
-
- /**
- * Any producers that have had their data elements taken from the queue but have not been unblocked are unblocked
- * when this method is called. The exception to this is producers that have had their data put back onto the queue
- * by a consumer. Producers that have had exceptions for their data items registered by consumers will be unblocked
- * but will not return from their put call normally, but with an exception instead.
- */
- public void unblockProducers()
- {
- log.debug("public void unblockProducers(): called");
-
- if (records != null)
- {
- for (SynchRecord<E> record : records)
- {
- // This call takes account of items that have already been released, are to be requeued or are in
- // error.
- record.releaseImmediately();
- }
- }
-
- records = null;
- }
- }
-
- /**
- * A SynchRecordImpl is used by a {@link BatchSynchQueue} to pair together a producer with its data. This allows
- * the producer of data to be identified so that it can be unblocked when its data is consumed or sent errors when
- * its data cannot be consumed.
- */
- public class SynchRecordImpl<E> implements SynchRecord<E>
- {
- /** A boolean latch that determines when the producer for this data item will be allowed to continue. */
- private BooleanLatch latch = new BooleanLatch();
-
- /** The data element associated with this item. */
- private E element;
-
- /**
- * Create a new synch record.
- *
- * @param e The data element that the record encapsulates.
- */
- public SynchRecordImpl(E e)
- {
- // Keep the data element.
- element = e;
- }
-
- /**
- * Waits until the producer is given permission to proceded by a consumer.
- */
- public void waitForConsumer()
- {
- latch.await();
- }
-
- /**
- * Gets the data element contained by this record.
- *
- * @return The data element contained by this record.
- */
- public E getElement()
- {
- return element;
- }
-
- /**
- * Immediately releases the producer of this data record. Consumers can bring the synchronization time of
- * producers to a minimum by using this method to release them at the earliest possible moment when batch
- * consuming records from sychronized producers.
- */
- public void releaseImmediately()
- {
- // Check that the record has not already been released, is in error or is to be requeued.
- latch.signal();
-
- // Propagate errors to the producer.
-
- // Requeue items to be requeued.
- }
-
- /**
- * Tells the synch queue to put this element back onto the queue instead of releasing its producer.
- * The element is not requeued immediately but upon calling the {@link SynchRef#unblockProducers()} method or
- * the {@link #releaseImmediately()} method.
- *
- * <p/>This method will raise a runtime exception {@link AlreadyUnblockedException} if the producer for this
- * element has already been unblocked.
- */
- public void reQueue()
- {
- throw new RuntimeException("Not implemented.");
- }
-
- /**
- * Tells the synch queue to raise an exception with this elements producer. The exception is not raised
- * immediately but upon calling the {@link SynchRef#unblockProducers()} method or the
- * {@link #releaseImmediately()} method. The exception will be wrapped in a {@link SynchException} before it is
- * raised on the producer.
- *
- * <p/>This method is unusual in that it accepts an exception as an argument. This is non-standard but is used
- * because the exception is to be passed onto a different thread.
- *
- * <p/>This method will raise a runtime exception {@link AlreadyUnblockedException} if the producer for this
- * element has already been unblocked.
- *
- * @param e The exception to raise on the producer.
- */
- public void inError(Exception e)
- {
- throw new RuntimeException("Not implemented.");
- }
- }
-}
diff --git a/java/common/src/main/java/org/apache/qpid/util/concurrent/BooleanLatch.java b/java/common/src/main/java/org/apache/qpid/util/concurrent/BooleanLatch.java
deleted file mode 100644
index 0e4a07594f..0000000000
--- a/java/common/src/main/java/org/apache/qpid/util/concurrent/BooleanLatch.java
+++ /dev/null
@@ -1,128 +0,0 @@
-package org.apache.qpid.util.concurrent;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.util.concurrent.locks.AbstractQueuedSynchronizer;
-
-/**
- * A BooleanLatch is like a set of traffic lights, where threads can wait at a red light until another thread gives
- * the green light. When threads arrive at the latch it is initially red. They queue up until the green signal is
- * given, at which point they can all acquire the latch in shared mode and continue to run concurrently. Once the latch
- * is signalled it cannot be reset to red again.
- *
- * <p/> The latch uses a {@link java.util.concurrent.locks.AbstractQueuedSynchronizer} to implement its synchronization.
- * This has two internal states, 0 which means that the latch is blocked, and 1 which means that the latch is open.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Block threads until a go signal is given.
- * </table>
- *
- * @todo Might be better to use a countdown latch to count down from 1. Its await method can throw interrupted
- * exception which makes the possibility of interruption more explicit, and provides a reminder to recheck the
- * latch condition before continuing.
- */
-public class BooleanLatch
-{
- /** Holds the synchronizer that provides the thread queueing synchronization. */
- private final Sync sync = new Sync();
-
- /**
- * Tests whether or not the latch has been signalled, that is to say that, the light is green.
- *
- * <p/>This method is non-blocking.
- *
- * @return <tt>true</tt> if the latch may be acquired; the light is green.
- */
- public boolean isSignalled()
- {
- return sync.isSignalled();
- }
-
- /**
- * Waits on the latch until the signal is given and the light is green. If the light is already green then the
- * latch will be acquired and the thread will not have to wait.
- *
- * <p/>This method will block until the go signal is given or the thread is otherwise interrupted. Before carrying
- * out any processing threads that return from this method should confirm that the go signal has really been given
- * on this latch by calling the {@link #isSignalled()} method.
- */
- public void await()
- {
- sync.acquireShared(1);
- }
-
- /**
- * Releases any threads currently waiting on the latch. This flips the light to green allowing any threads that
- * were waiting for this condition to now run.
- *
- * <p/>This method is non-blocking.
- */
- public void signal()
- {
- sync.releaseShared(1);
- }
-
- /**
- * Implements a thread queued synchronizer. The internal state 0 means that the queue is blocked and the internl
- * state 1 means that the queue is released and that all waiting threads can acquire the synchronizer in shared
- * mode.
- */
- private static class Sync extends AbstractQueuedSynchronizer
- {
- /**
- * Attempts to acquire this synchronizer in shared mode. It may be acquired once it has been released.
- *
- * @param ignore This parameter is ignored.
- *
- * @return 1 if the shared acquisition succeeds and -1 if it fails.
- */
- protected int tryAcquireShared(int ignore)
- {
- return isSignalled() ? 1 : -1;
- }
-
- /**
- * Releases the synchronizer, setting its internal state to 1.
- *
- * @param ignore This parameter is ignored.
- *
- * @return <tt>true</tt> always.
- */
- protected boolean tryReleaseShared(int ignore)
- {
- setState(1);
-
- return true;
- }
-
- /**
- * Tests if the synchronizer is signalled. It is signalled when its internal state it 1.
- *
- * @return <tt>true</tt> if the internal state is 1, <tt>false</tt> otherwise.
- */
- boolean isSignalled()
- {
- return getState() != 0;
- }
- }
-}
diff --git a/java/common/src/main/java/org/apache/qpid/util/concurrent/Capacity.java b/java/common/src/main/java/org/apache/qpid/util/concurrent/Capacity.java
deleted file mode 100644
index a97ce0e172..0000000000
--- a/java/common/src/main/java/org/apache/qpid/util/concurrent/Capacity.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package org.apache.qpid.util.concurrent;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-/**
- * An interface exposed by data structures that have a maximum capacity.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Report the maximum capacity.
- * </table>
- */
-public interface Capacity
-{
- public int getCapacity();
-}
diff --git a/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchBuffer.java b/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchBuffer.java
deleted file mode 100644
index bc63eb0353..0000000000
--- a/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchBuffer.java
+++ /dev/null
@@ -1,50 +0,0 @@
-package org.apache.qpid.util.concurrent;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.util.Queue;
-
-/**
- * SynchBuffer completes the {@link BatchSynchQueueBase} abstract class by providing an implementation of the underlying
- * queue as an array. This uses FIFO ordering for the queue but restricts the maximum size of the queue to a fixed
- * amount. It also has the advantage that, as the buffer does not grow and shrink dynamically, memory for the buffer
- * is allocated up front and does not create garbage during the operation of the queue.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Provide array based FIFO queue to create a batch synched queue around.
- * </table>
- *
- * @todo Write an array based buffer implementation that implements Queue.
- */
-public class SynchBuffer<E> extends BatchSynchQueueBase<E>
-{
- /**
- * Returns an empty queue, implemented as an array.
- *
- * @return An empty queue, implemented as an array.
- */
- protected <T> Queue<T> createQueue()
- {
- throw new RuntimeException("Not implemented.");
- }
-}
diff --git a/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchException.java b/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchException.java
deleted file mode 100644
index 23ee695079..0000000000
--- a/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchException.java
+++ /dev/null
@@ -1,52 +0,0 @@
-package org.apache.qpid.util.concurrent;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-/**
- * SynchException is used to encapsulate exceptions with the data elements that caused them in order to send exceptions
- * back from the consumers of a {@link BatchSynchQueue} to producers. The underlying exception should be retrieved from
- * the {@link #getCause} method.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Encapsulate a data element and exception.
- * </table>
- */
-public class SynchException extends Exception
-{
- /** Holds the data element that is in error. */
- private Object element;
-
- /**
- * Creates a new BaseApplicationException object.
- *
- * @param message The exception message.
- * @param cause The underlying throwable cause. This may be null.
- */
- public SynchException(String message, Throwable cause, Object element)
- {
- super(message, cause);
-
- // Keep the data element that was in error.
- this.element = element;
- }
-}
diff --git a/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchQueue.java b/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchQueue.java
deleted file mode 100644
index 95833f398a..0000000000
--- a/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchQueue.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package org.apache.qpid.util.concurrent;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.util.LinkedList;
-import java.util.Queue;
-
-/**
- * SynchQueue completes the {@link BatchSynchQueueBase} abstract class by providing an implementation of the underlying
- * queue as a linked list. This uses FIFO ordering for the queue and allows the queue to grow to accomodate more
- * elements as needed.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Provide linked list FIFO queue to create a batch synched queue around.
- * </table>
- */
-public class SynchQueue<E> extends BatchSynchQueueBase<E>
-{
- /**
- * Returns an empty queue, implemented as a linked list.
- *
- * @return An empty queue, implemented as a linked list.
- */
- protected <T> Queue<T> createQueue()
- {
- return new LinkedList<T>();
- }
-}
diff --git a/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRecord.java b/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRecord.java
deleted file mode 100644
index fd740c20cd..0000000000
--- a/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRecord.java
+++ /dev/null
@@ -1,74 +0,0 @@
-package org.apache.qpid.util.concurrent;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-/**
- * SynchRecord associates a data item from a {@link BatchSynchQueue} with its producer. This enables the data item data
- * item to be put back on the queue without unblocking its producer, or to send exceptions to the producer.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Get the underlying data element.
- * <tr><td> Put the data element back on the queue without unblocking its producer.
- * <tr><td> Send and exception to the data elements producer.
- * </table>
- */
-public interface SynchRecord<E>
-{
- /**
- * Gets the data element contained by this record.
- *
- * @return The data element contained by this record.
- */
- public E getElement();
-
- /**
- * Tells the synch queue to put this element back onto the queue instead of releasing its producer.
- * The element is not requeued immediately but upon calling the {@link SynchRef#unblockProducers()} method.
- *
- * <p/>This method will raise a runtime exception {@link AlreadyUnblockedException} if the producer for this element
- * has already been unblocked.
- */
- public void reQueue();
-
- /**
- * Immediately releases the producer of this data record. Consumers can bring the synchronization time of
- * producers to a minimum by using this method to release them at the earliest possible moment when batch
- * consuming records from sychronized producers.
- */
- public void releaseImmediately();
-
- /**
- * Tells the synch queue to raise an exception with this elements producer. The exception is not raised immediately
- * but upon calling the {@link SynchRef#unblockProducers()} method. The exception will be wrapped in a
- * {@link SynchException} before it is raised on the producer.
- *
- * <p/>This method is unusual in that it accepts an exception as an argument. This is non-standard but is used
- * because the exception is to be passed onto a different thread.
- *
- * <p/>This method will raise a runtime exception {@link AlreadyUnblockedException} if the producer for this element
- * has already been unblocked.
- *
- * @param e The exception to raise on the producer.
- */
- public void inError(Exception e);
-}
diff --git a/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRef.java b/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRef.java
deleted file mode 100644
index efe2344c06..0000000000
--- a/java/common/src/main/java/org/apache/qpid/util/concurrent/SynchRef.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package org.apache.qpid.util.concurrent;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-/**
- * A SynchRef is an interface which is returned from the synchronous take and drain methods of {@link BatchSynchQueue},
- * allowing call-backs to be made against the synchronizing strucutre. It allows the consumer to communicate when it
- * wants producers that have their data taken to be unblocked.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities
- * <tr><td> Report number of records returned by a taking operation.
- * <tr><td> Provide call-back to release producers of taken records.
- * </table>
- */
-public interface SynchRef
-{
- /**
- * Reports the number of records taken by the take or drain operation.
- *
- * @return The number of records taken by the take or drain operation.
- */
- public int getNumRecords();
-
- /**
- * Any producers that have had their data elements taken from the queue but have not been unblocked are
- * unblocked when this method is called. The exception to this is producers that have had their data put back
- * onto the queue by a consumer. Producers that have had exceptions for their data items registered by consumers
- * will be unblocked but will not return from their put call normally, but with an exception instead.
- */
- public void unblockProducers();
-}