diff options
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/framing')
28 files changed, 215 insertions, 468 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java index 363d9f1ccc..cb0c78ef37 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java @@ -20,13 +20,12 @@ */ package org.apache.qpid.framing; +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; + import java.io.DataOutput; -import java.io.DataOutputStream; import java.io.IOException; -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; -import org.apache.qpid.AMQException; - public interface AMQBody { public byte getFrameType(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java index e77e5942e3..fd42084429 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java @@ -20,9 +20,7 @@ */ package org.apache.qpid.framing; -import java.io.DataInputStream; import java.io.DataOutput; -import java.io.DataOutputStream; import java.io.IOException; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java index b6f2fb18ea..9d5e654ad0 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java @@ -20,11 +20,11 @@ */ package org.apache.qpid.framing; -import org.apache.qpid.codec.MarkableDataInput; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.DataInputStream; +import org.apache.qpid.codec.MarkableDataInput; + import java.io.IOException; public class AMQDataBlockDecoder @@ -39,7 +39,7 @@ public class AMQDataBlockDecoder _bodiesSupported[HeartbeatBody.TYPE] = new HeartbeatBodyFactory(); } - Logger _logger = LoggerFactory.getLogger(AMQDataBlockDecoder.class); + private Logger _logger = LoggerFactory.getLogger(AMQDataBlockDecoder.class); public AMQDataBlockDecoder() { } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java index 9b5699e8ff..238f28e73e 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java @@ -22,8 +22,8 @@ package org.apache.qpid.framing; import org.apache.qpid.codec.MarkableDataInput; -import java.io.*; import java.io.DataOutput; +import java.io.IOException; public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java index 2170ebf992..966a03605c 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java @@ -26,7 +26,6 @@ import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; import java.io.DataOutput; -import java.io.DataOutputStream; import java.io.IOException; public interface AMQMethodBody extends AMQBody @@ -53,10 +52,6 @@ public interface AMQMethodBody extends AMQBody public void writePayload(DataOutput buffer) throws IOException; - //public abstract void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException; - - //public void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException; - public AMQFrame generateFrame(int channelId); public String toString(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java index ec6d662726..7fe293b6b7 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java @@ -20,14 +20,12 @@ */ package org.apache.qpid.framing; -import org.apache.qpid.codec.MarkableDataInput; -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.DataInput; -import java.io.DataInputStream; +import org.apache.qpid.codec.MarkableDataInput; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; + import java.io.IOException; public class AMQMethodBodyFactory implements BodyFactory diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java index 4ff7827d7f..85870e68c5 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java @@ -25,11 +25,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.DataInput; -import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; -import java.util.*; import java.lang.ref.WeakReference; +import java.util.Arrays; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.WeakHashMap; /** * A short string is a representation of an AMQ Short String @@ -318,7 +321,6 @@ public final class AMQShortString implements CharSequence, Comparable<AMQShortSt { final int size = length(); - //buffer.setAutoExpand(true); buffer.writeByte(size); buffer.write(_data, _offset, size); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypeMap.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypeMap.java index a07fd78c8c..94a7d127b3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypeMap.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypeMap.java @@ -27,6 +27,10 @@ public class AMQTypeMap { public static final Map<Byte, AMQType> _reverseTypeMap = new HashMap<Byte, AMQType>(); + private AMQTypeMap() + { + } + static { for(AMQType type : AMQType.values()) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java index f64164c10b..c4dc86bf11 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java @@ -20,10 +20,12 @@ */ package org.apache.qpid.framing; -import java.io.*; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.math.BigDecimal; import java.util.Date; import java.util.Map; -import java.math.BigDecimal; /** * AMQTypedValue combines together a native Java Object value, and an {@link AMQType}, as a fully typed AMQP parameter diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java index 2739f7d14b..eb528159c0 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java @@ -20,14 +20,13 @@ */ package org.apache.qpid.framing; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.DataInput; -import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class BasicContentHeaderProperties implements CommonContentHeaderProperties { //persistent & non-persistent constants, values as per JMS DeliveryMode @@ -89,7 +88,7 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti public int getPropertyListSize() { - if(_encodedForm != null && (_headers == null || _headers.isClean())) + if(useEncodedForm()) { return _encodedForm.length; } @@ -190,7 +189,7 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti public void writePropertyListPayload(DataOutput buffer) throws IOException { - if(_encodedForm != null && (_headers == null || !_headers.isClean())) + if(useEncodedForm()) { buffer.write(_encodedForm); } @@ -295,85 +294,83 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti private void decode(ByteArrayDataInput buffer) throws IOException, AMQFrameDecodingException { - // ByteBuffer buffer = ByteBuffer.wrap(_encodedForm); + int headersOffset = 0; - int headersOffset = 0; - - if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0) - { - _contentType = buffer.readAMQShortString(); - headersOffset += EncodingUtils.encodedShortStringLength(_contentType); - } + if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0) + { + _contentType = buffer.readAMQShortString(); + headersOffset += EncodingUtils.encodedShortStringLength(_contentType); + } - if ((_propertyFlags & ENCODING_MASK) != 0) - { - _encoding = buffer.readAMQShortString(); - headersOffset += EncodingUtils.encodedShortStringLength(_encoding); - } + if ((_propertyFlags & ENCODING_MASK) != 0) + { + _encoding = buffer.readAMQShortString(); + headersOffset += EncodingUtils.encodedShortStringLength(_encoding); + } - if ((_propertyFlags & HEADERS_MASK) != 0) - { - long length = EncodingUtils.readUnsignedInteger(buffer); + if ((_propertyFlags & HEADERS_MASK) != 0) + { + long length = EncodingUtils.readUnsignedInteger(buffer); - _headers = new FieldTable(_encodedForm, headersOffset+4, (int)length); + _headers = new FieldTable(_encodedForm, headersOffset+4, (int)length); - buffer.skipBytes((int)length); - } + buffer.skipBytes((int)length); + } - if ((_propertyFlags & DELIVERY_MODE_MASK) != 0) - { - _deliveryMode = buffer.readByte(); - } + if ((_propertyFlags & DELIVERY_MODE_MASK) != 0) + { + _deliveryMode = buffer.readByte(); + } - if ((_propertyFlags & PRIORITY_MASK) != 0) - { - _priority = buffer.readByte(); - } + if ((_propertyFlags & PRIORITY_MASK) != 0) + { + _priority = buffer.readByte(); + } - if ((_propertyFlags & CORRELATION_ID_MASK) != 0) - { - _correlationId = buffer.readAMQShortString(); - } + if ((_propertyFlags & CORRELATION_ID_MASK) != 0) + { + _correlationId = buffer.readAMQShortString(); + } - if ((_propertyFlags & REPLY_TO_MASK) != 0) - { - _replyTo = buffer.readAMQShortString(); - } + if ((_propertyFlags & REPLY_TO_MASK) != 0) + { + _replyTo = buffer.readAMQShortString(); + } - if ((_propertyFlags & EXPIRATION_MASK) != 0) - { - _expiration = EncodingUtils.readLongAsShortString(buffer); - } + if ((_propertyFlags & EXPIRATION_MASK) != 0) + { + _expiration = EncodingUtils.readLongAsShortString(buffer); + } - if ((_propertyFlags & MESSAGE_ID_MASK) != 0) - { - _messageId = buffer.readAMQShortString(); - } + if ((_propertyFlags & MESSAGE_ID_MASK) != 0) + { + _messageId = buffer.readAMQShortString(); + } - if ((_propertyFlags & TIMESTAMP_MASK) != 0) - { - _timestamp = EncodingUtils.readTimestamp(buffer); - } + if ((_propertyFlags & TIMESTAMP_MASK) != 0) + { + _timestamp = EncodingUtils.readTimestamp(buffer); + } - if ((_propertyFlags & TYPE_MASK) != 0) - { - _type = buffer.readAMQShortString(); - } + if ((_propertyFlags & TYPE_MASK) != 0) + { + _type = buffer.readAMQShortString(); + } - if ((_propertyFlags & USER_ID_MASK) != 0) - { - _userId = buffer.readAMQShortString(); - } + if ((_propertyFlags & USER_ID_MASK) != 0) + { + _userId = buffer.readAMQShortString(); + } - if ((_propertyFlags & APPLICATION_ID_MASK) != 0) - { - _appId = buffer.readAMQShortString(); - } + if ((_propertyFlags & APPLICATION_ID_MASK) != 0) + { + _appId = buffer.readAMQShortString(); + } - if ((_propertyFlags & CLUSTER_ID_MASK) != 0) - { - _clusterId = buffer.readAMQShortString(); - } + if ((_propertyFlags & CLUSTER_ID_MASK) != 0) + { + _clusterId = buffer.readAMQShortString(); + } } @@ -655,4 +652,10 @@ public class BasicContentHeaderProperties implements CommonContentHeaderProperti + _expiration + ",JMSPriority = " + _priority + ",JMSTimestamp = " + _timestamp + ",JMSType = " + _type; } + private boolean useEncodedForm() + { + return _encodedForm != null && (_headers == null || _headers.isClean()); + } + + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java index 656185629b..196ab422a3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java @@ -2,8 +2,6 @@ package org.apache.qpid.framing; import org.apache.qpid.codec.MarkableDataInput; -import java.io.IOException; - public class ByteArrayDataInput implements ExtendedDataInput, MarkableDataInput { private byte[] _data; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java index 541d104dc9..6d6ec708d0 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java @@ -20,20 +20,20 @@ */ package org.apache.qpid.framing; +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; + import java.io.DataInput; import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; import java.nio.ByteBuffer; -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; -import org.apache.qpid.AMQException; - public class ContentBody implements AMQBody { public static final byte TYPE = 3; - public byte[] _payload; + private byte[] _payload; public ContentBody() { @@ -42,7 +42,7 @@ public class ContentBody implements AMQBody public ContentBody(DataInput buffer, long size) throws AMQFrameDecodingException, IOException { _payload = new byte[(int)size]; - buffer.readFully(_payload); + buffer.readFully(getPayload()); } @@ -58,12 +58,12 @@ public class ContentBody implements AMQBody public int getSize() { - return _payload == null ? 0 : _payload.length; + return getPayload() == null ? 0 : getPayload().length; } public void writePayload(DataOutput buffer) throws IOException { - buffer.write(_payload); + buffer.write(getPayload()); } public void handle(final int channelId, final AMQVersionAwareProtocolSession session) @@ -77,7 +77,7 @@ public class ContentBody implements AMQBody if (size > 0) { _payload = new byte[(int)size]; - buffer.read(_payload); + buffer.read(getPayload()); } } @@ -86,6 +86,11 @@ public class ContentBody implements AMQBody { } + public byte[] getPayload() + { + return _payload; + } + private static class BufferContentBody implements AMQBody { private final int _length; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java index de2ffe9755..10df105ee6 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBodyFactory.java @@ -20,12 +20,13 @@ */ package org.apache.qpid.framing; -import java.io.IOException; - -import org.apache.qpid.codec.MarkableDataInput; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.codec.MarkableDataInput; + +import java.io.IOException; + public class ContentBodyFactory implements BodyFactory { private static final Logger _log = LoggerFactory.getLogger(AMQMethodBodyFactory.class); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java index 8a2ad53157..f6fa89a91c 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java @@ -20,24 +20,23 @@ */ package org.apache.qpid.framing; +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; + import java.io.DataInput; import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; -import org.apache.qpid.AMQException; - public class ContentHeaderBody implements AMQBody { public static final byte TYPE = 2; - public int classId; + private int classId; - public int weight; + private int weight; - /** unsigned long but java can't handle that anyway when allocating byte array */ - public long bodySize; + private long bodySize; /** must never be null */ private ContentHeaderProperties properties; @@ -76,17 +75,6 @@ public class ContentHeaderBody implements AMQBody return TYPE; } - protected void populateFromBuffer(DataInputStream buffer, long size) - throws AMQFrameDecodingException, AMQProtocolVersionException, IOException - { - classId = buffer.readUnsignedShort(); - weight = buffer.readUnsignedShort(); - bodySize = buffer.readLong(); - int propertyFlags = buffer.readUnsignedShort(); - ContentHeaderPropertiesFactory factory = ContentHeaderPropertiesFactory.getInstance(); - properties = factory.createContentHeaderProperties(classId, propertyFlags, buffer, (int)size - 14); - } - /** * Helper method that is used currently by the persistence layer (by BDB at the moment). * @param buffer @@ -153,4 +141,25 @@ public class ContentHeaderBody implements AMQBody ", properties=" + properties + '}'; } + + public int getClassId() + { + return classId; + } + + public int getWeight() + { + return weight; + } + + /** unsigned long but java can't handle that anyway when allocating byte array */ + public long getBodySize() + { + return bodySize; + } + + public void setBodySize(long bodySize) + { + this.bodySize = bodySize; + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java index c3e4c69ec0..83a5211013 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBodyFactory.java @@ -20,12 +20,13 @@ */ package org.apache.qpid.framing; -import java.io.IOException; - -import org.apache.qpid.codec.MarkableDataInput; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.codec.MarkableDataInput; + +import java.io.IOException; + public class ContentHeaderBodyFactory implements BodyFactory { private static final Logger _log = LoggerFactory.getLogger(AMQMethodBodyFactory.class); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java index ea8358a538..2e1b988aa3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java @@ -21,7 +21,6 @@ package org.apache.qpid.framing; import java.io.DataInput; -import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java index 48bd52858d..ff97c0b28f 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java @@ -20,12 +20,11 @@ */ package org.apache.qpid.framing; +import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl; + import java.io.DataInput; -import java.io.DataInputStream; import java.io.IOException; -import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl; - public class ContentHeaderPropertiesFactory { private static final ContentHeaderPropertiesFactory _instance = new ContentHeaderPropertiesFactory(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java index e018407509..1ecd8a13b7 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java @@ -20,11 +20,12 @@ */ package org.apache.qpid.framing; -import java.io.*; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.Charset; @@ -40,6 +41,10 @@ public class EncodingUtils public static final int SIZEOF_UNSIGNED_INT = 4; private static final boolean[] ALL_FALSE_ARRAY = new boolean[8]; + private EncodingUtils() + { + } + public static int encodedShortStringLength(String s) { if (s == null) @@ -114,7 +119,7 @@ public class EncodingUtils { return len + 6 + encodedShortStringLength((short) (i / 1000000)); } - else // if (i > 99999) + else // if i > 99999 { return len + 5 + encodedShortStringLength((short) (i / 100000)); } @@ -259,7 +264,6 @@ public class EncodingUtils public static void writeLongStringBytes(DataOutput buffer, String s) throws IOException { - assert (s == null) || (s.length() <= 0xFFFE); if (s != null) { int len = s.length(); @@ -281,7 +285,6 @@ public class EncodingUtils public static void writeLongStringBytes(DataOutput buffer, char[] s) throws IOException { - assert (s == null) || (s.length <= 0xFFFE); if (s != null) { int len = s.length; @@ -302,7 +305,6 @@ public class EncodingUtils public static void writeLongStringBytes(DataOutput buffer, byte[] bytes) throws IOException { - assert (bytes == null) || (bytes.length <= 0xFFFE); if (bytes != null) { writeUnsignedInteger(buffer, bytes.length); @@ -736,8 +738,6 @@ public class EncodingUtils public static long readTimestamp(DataInput buffer) throws IOException { - // Discard msb from AMQ timestamp - // buffer.getUnsignedInt(); return buffer.readLong(); } @@ -802,8 +802,6 @@ public class EncodingUtils byte[] from = new byte[size]; - // Is this not the same. - // bb.get(from, 0, length); for (int i = 0; i < size; i++) { from[i] = bb.get(i); @@ -958,7 +956,6 @@ public class EncodingUtils else { // really writing out unsigned byte - //buffer.put((byte) 0); writeUnsignedInteger(buffer, 0L); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java index 863e363b87..57f2c638a2 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java @@ -25,7 +25,11 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.AMQPInvalidClassException; -import java.io.*; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; import java.math.BigDecimal; import java.util.Collections; import java.util.Enumeration; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java index af0c5b845c..a2d4d27396 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java @@ -21,11 +21,14 @@ package org.apache.qpid.framing; import java.io.DataInput; -import java.io.DataInputStream; import java.io.IOException; public class FieldTableFactory { + private FieldTableFactory() + { + } + public static FieldTable newFieldTable() { return new FieldTable(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java index 95b6246717..1613cd055e 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.framing; +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; + import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; -import org.apache.qpid.AMQException; - public class HeartbeatBody implements AMQBody { public static final byte TYPE = 8; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java index 2925724dc2..0a06f0f1e9 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java @@ -23,8 +23,9 @@ package org.apache.qpid.framing; import org.apache.qpid.AMQException; import org.apache.qpid.codec.MarkableDataInput; -import java.io.*; - +import java.io.DataOutput; +import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.util.Arrays; public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock @@ -36,14 +37,11 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData private static final byte CURRENT_PROTOCOL_CLASS = 1; private static final byte TCP_PROTOCOL_INSTANCE = 1; - public final byte[] _protocolHeader; - public final byte _protocolClass; - public final byte _protocolInstance; - public final byte _protocolMajor; - public final byte _protocolMinor; - - -// public ProtocolInitiation() {} + private final byte[] _protocolHeader; + private final byte _protocolClass; + private final byte _protocolInstance; + private final byte _protocolMajor; + private final byte _protocolMinor; public ProtocolInitiation(byte[] protocolHeader, byte protocolClass, byte protocolInstance, byte protocolMajor, byte protocolMinor) { @@ -206,6 +204,26 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData return pv; } + public byte getProtocolClass() + { + return _protocolClass; + } + + public byte getProtocolInstance() + { + return _protocolInstance; + } + + public byte getProtocolMajor() + { + return _protocolMajor; + } + + public byte getProtocolMinor() + { + return _protocolMinor; + } + public String toString() { StringBuffer buffer = new StringBuffer(new String(_protocolHeader)); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java deleted file mode 100644 index dd854dd498..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.framing; - -import java.io.DataOutput; -import java.io.IOException; - -public class SmallCompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock -{ - private AMQDataBlock _firstFrame; - - private AMQDataBlock _block; - - public SmallCompositeAMQDataBlock(AMQDataBlock block) - { - _block = block; - } - - /** - * The encoded block will be logically first before the AMQDataBlocks which are encoded - * into the buffer afterwards. - * @param encodedBlock already-encoded data - * @param block a block to be encoded. - */ - public SmallCompositeAMQDataBlock(AMQDataBlock encodedBlock, AMQDataBlock block) - { - this(block); - _firstFrame = encodedBlock; - } - - public AMQDataBlock getBlock() - { - return _block; - } - - public AMQDataBlock getFirstFrame() - { - return _firstFrame; - } - - public long getSize() - { - long frameSize = _block.getSize(); - - if (_firstFrame != null) - { - - frameSize += _firstFrame.getSize(); - } - return frameSize; - } - - public void writePayload(DataOutput buffer) throws IOException - { - if (_firstFrame != null) - { - _firstFrame.writePayload(buffer); - } - _block.writePayload(buffer); - - } - - public String toString() - { - if (_block == null) - { - return "No blocks contained in composite frame"; - } - else - { - StringBuilder buf = new StringBuilder(this.getClass().getName()); - buf.append("{encodedBlock=").append(_firstFrame); - - buf.append(" _block=[").append(_block.toString()).append("]"); - - buf.append("}"); - return buf.toString(); - } - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java deleted file mode 100644 index e770fdd7e4..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java +++ /dev/null @@ -1,200 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.framing; - -import java.io.DataInputStream; -import java.io.IOException; - -import org.apache.qpid.codec.MarkableDataInput; -import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class VersionSpecificRegistry -{ - private static final Logger _log = LoggerFactory.getLogger(VersionSpecificRegistry.class); - - private final byte _protocolMajorVersion; - private final byte _protocolMinorVersion; - - private static final int DEFAULT_MAX_CLASS_ID = 200; - private static final int DEFAULT_MAX_METHOD_ID = 50; - - private AMQMethodBodyInstanceFactory[][] _registry = new AMQMethodBodyInstanceFactory[DEFAULT_MAX_CLASS_ID][]; - - private ProtocolVersionMethodConverter _protocolVersionConverter; - - public VersionSpecificRegistry(byte major, byte minor) - { - _protocolMajorVersion = major; - _protocolMinorVersion = minor; - - _protocolVersionConverter = loadProtocolVersionConverters(major, minor); - } - - private static ProtocolVersionMethodConverter loadProtocolVersionConverters(byte protocolMajorVersion, - byte protocolMinorVersion) - { - try - { - Class<ProtocolVersionMethodConverter> versionMethodConverterClass = - (Class<ProtocolVersionMethodConverter>) Class.forName("org.apache.qpid.framing.MethodConverter_" - + protocolMajorVersion + "_" + protocolMinorVersion); - - return versionMethodConverterClass.newInstance(); - - } - catch (ClassNotFoundException e) - { - _log.warn("Could not find protocol conversion classes for " + protocolMajorVersion + "-" + protocolMinorVersion); - if (protocolMinorVersion != 0) - { - protocolMinorVersion--; - - return loadProtocolVersionConverters(protocolMajorVersion, protocolMinorVersion); - } - else if (protocolMajorVersion != 0) - { - protocolMajorVersion--; - - return loadProtocolVersionConverters(protocolMajorVersion, protocolMinorVersion); - } - else - { - return null; - } - - } - catch (IllegalAccessException e) - { - throw new IllegalStateException("Unable to load protocol version converter: ", e); - } - catch (InstantiationException e) - { - throw new IllegalStateException("Unable to load protocol version converter: ", e); - } - } - - public byte getProtocolMajorVersion() - { - return _protocolMajorVersion; - } - - public byte getProtocolMinorVersion() - { - return _protocolMinorVersion; - } - - public AMQMethodBodyInstanceFactory getMethodBody(final short classID, final short methodID) - { - try - { - return _registry[classID][methodID]; - } - catch (IndexOutOfBoundsException e) - { - return null; - } - catch (NullPointerException e) - { - return null; - } - } - - public void registerMethod(final short classID, final short methodID, final AMQMethodBodyInstanceFactory instanceFactory) - { - if (_registry.length <= classID) - { - AMQMethodBodyInstanceFactory[][] oldRegistry = _registry; - _registry = new AMQMethodBodyInstanceFactory[classID + 1][]; - System.arraycopy(oldRegistry, 0, _registry, 0, oldRegistry.length); - } - - if (_registry[classID] == null) - { - _registry[classID] = - new AMQMethodBodyInstanceFactory[(methodID > DEFAULT_MAX_METHOD_ID) ? (methodID + 1) - : (DEFAULT_MAX_METHOD_ID + 1)]; - } - else if (_registry[classID].length <= methodID) - { - AMQMethodBodyInstanceFactory[] oldMethods = _registry[classID]; - _registry[classID] = new AMQMethodBodyInstanceFactory[methodID + 1]; - System.arraycopy(oldMethods, 0, _registry[classID], 0, oldMethods.length); - } - - _registry[classID][methodID] = instanceFactory; - - } - - public AMQMethodBody get(short classID, short methodID, MarkableDataInput in, long size) throws AMQFrameDecodingException, IOException - { - AMQMethodBodyInstanceFactory bodyFactory; - try - { - bodyFactory = _registry[classID][methodID]; - } - catch (NullPointerException e) - { - throw new AMQFrameDecodingException(null, "Class " + classID + " unknown in AMQP version " - + _protocolMajorVersion + "-" + _protocolMinorVersion + " (while trying to decode class " + classID - + " method " + methodID + ".", e); - } - catch (IndexOutOfBoundsException e) - { - if (classID >= _registry.length) - { - throw new AMQFrameDecodingException(null, "Class " + classID + " unknown in AMQP version " - + _protocolMajorVersion + "-" + _protocolMinorVersion + " (while trying to decode class " + classID - + " method " + methodID + ".", e); - - } - else - { - throw new AMQFrameDecodingException(null, "Method " + methodID + " unknown in AMQP version " - + _protocolMajorVersion + "-" + _protocolMinorVersion + " (while trying to decode class " + classID - + " method " + methodID + ".", e); - - } - } - - if (bodyFactory == null) - { - throw new AMQFrameDecodingException(null, "Method " + methodID + " unknown in AMQP version " - + _protocolMajorVersion + "-" + _protocolMinorVersion + " (while trying to decode class " + classID - + " method " + methodID + ".", null); - } - - return bodyFactory.newInstance( in, size); - - } - - public ProtocolVersionMethodConverter getProtocolVersionMethodConverter() - { - return _protocolVersionConverter; - } - - public void configure() - { - _protocolVersionConverter.configure(); - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoImpl.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoImpl.java index e3d5da73da..53c70c8d71 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoImpl.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoImpl.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.framing.abstraction; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.AMQShortString; public class MessagePublishInfoImpl implements MessagePublishInfo diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java index 90a730d6f7..b3eb1211a5 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java @@ -21,12 +21,16 @@ package org.apache.qpid.framing.amqp_0_9; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.abstraction.AbstractMethodConverter; -import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; -import org.apache.qpid.framing.*; +import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; public class MethodConverter_0_9 extends AbstractMethodConverter implements ProtocolVersionMethodConverter @@ -115,7 +119,7 @@ public class MethodConverter_0_9 extends AbstractMethodConverter implements Prot public byte[] getData() { - return _contentBodyChunk._payload; + return _contentBodyChunk.getPayload(); } public void reduceToFit() diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java index 3b0cc3cebc..d33749d795 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java @@ -21,12 +21,16 @@ package org.apache.qpid.framing.amqp_0_91; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.abstraction.AbstractMethodConverter; -import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; -import org.apache.qpid.framing.*; +import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; public class MethodConverter_0_91 extends AbstractMethodConverter implements ProtocolVersionMethodConverter { @@ -114,7 +118,7 @@ public class MethodConverter_0_91 extends AbstractMethodConverter implements Pro public byte[] getData() { - return _contentBodyChunk._payload; + return _contentBodyChunk.getPayload(); } public void reduceToFit() diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java index e6d0482f0d..4c7772a3a9 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java @@ -21,12 +21,16 @@ package org.apache.qpid.framing.amqp_8_0; -import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.abstraction.AbstractMethodConverter; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.framing.abstraction.AbstractMethodConverter; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; -import org.apache.qpid.framing.*; +import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; public class MethodConverter_8_0 extends AbstractMethodConverter implements ProtocolVersionMethodConverter { @@ -59,7 +63,7 @@ public class MethodConverter_8_0 extends AbstractMethodConverter implements Prot public byte[] getData() { - return contentBodyChunk._payload; + return contentBodyChunk.getPayload(); } public void reduceToFit() |