diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java | 611 |
1 files changed, 369 insertions, 242 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java index ccb3c0bf57..04f3c5ee17 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java @@ -33,7 +33,7 @@ import java.nio.charset.Charset; */ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMessage { - public static final String MIME_TYPE="jms/stream-message"; + public static final String MIME_TYPE="jms/stream-message"; private static final byte BOOLEAN_TYPE = (byte) 1; @@ -55,6 +55,8 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess private static final byte STRING_TYPE = (byte) 10; + private static final byte NULL_STRING_TYPE = (byte) 11; + /** * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read * a byte array in multiple chunks, hence this is used to track how much is left to be read @@ -89,7 +91,7 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess return MIME_TYPE; } - private byte readAndCheckType() throws MessageFormatException, MessageEOFException, + private byte readWireType() throws MessageFormatException, MessageEOFException, MessageNotReadableException { checkReadable(); @@ -105,22 +107,32 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public boolean readBoolean() throws JMSException { - byte wireType = readAndCheckType(); + int position = _data.position(); + byte wireType = readWireType(); boolean result; - switch (wireType) - { - case BOOLEAN_TYPE: - checkAvailable(1); - result = readBooleanImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Boolean.parseBoolean(readStringImpl()); - break; - default: - throw new MessageFormatException("Unable to convert " + wireType + " to a boolean"); + try + { + switch (wireType) + { + case BOOLEAN_TYPE: + checkAvailable(1); + result = readBooleanImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Boolean.parseBoolean(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a boolean"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; } - return result; } private boolean readBooleanImpl() @@ -130,20 +142,30 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public byte readByte() throws JMSException { - byte wireType = readAndCheckType(); + int position = _data.position(); + byte wireType = readWireType(); byte result; - switch (wireType) - { - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Byte.parseByte(readStringImpl()); - break; - default: - throw new MessageFormatException("Unable to convert " + wireType + " to a byte"); + try + { + switch (wireType) + { + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Byte.parseByte(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a byte"); + } + } + catch (RuntimeException e) + { + _data.position(position); + throw e; } return result; } @@ -155,24 +177,34 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public short readShort() throws JMSException { - byte wireType = readAndCheckType(); + int position = _data.position(); + byte wireType = readWireType(); short result; - switch (wireType) + try { - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Short.parseShort(readStringImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - default: - throw new MessageFormatException("Unable to convert " + wireType + " to a short"); + switch (wireType) + { + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Short.parseShort(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a short"); + } + } + catch (RuntimeException e) + { + _data.position(position); + throw e; } return result; } @@ -190,15 +222,25 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess */ public char readChar() throws JMSException { - byte wireType = readAndCheckType(); - if (wireType != CHAR_TYPE) + int position = _data.position(); + byte wireType = readWireType(); + try { - throw new MessageFormatException("Unable to convert " + wireType + " to a char"); + if (wireType != CHAR_TYPE) + { + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a char"); + } + else + { + checkAvailable(2); + return readCharImpl(); + } } - else + catch (RuntimeException e) { - checkAvailable(2); - return readCharImpl(); + _data.position(position); + throw e; } } @@ -209,30 +251,40 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public int readInt() throws JMSException { - byte wireType = readAndCheckType(); + int position = _data.position(); + byte wireType = readWireType(); int result; - switch (wireType) + try { - case INT_TYPE: - checkAvailable(4); - result = readIntImpl(); - break; - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Integer.parseInt(readStringImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - default: - throw new MessageFormatException("Unable to convert " + wireType + " to an int"); + switch (wireType) + { + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Integer.parseInt(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to an int"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; } - return result; } private int readIntImpl() @@ -242,34 +294,44 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public long readLong() throws JMSException { - byte wireType = readAndCheckType(); + int position = _data.position(); + byte wireType = readWireType(); long result; - switch (wireType) - { - case LONG_TYPE: - checkAvailable(8); - result = readLongImpl(); - break; - case INT_TYPE: - checkAvailable(4); - result = readIntImpl(); - break; - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Long.parseLong(readStringImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - default: - throw new MessageFormatException("Unable to convert " + wireType + " to a long"); + try + { + switch (wireType) + { + case LONG_TYPE: + checkAvailable(8); + result = readLongImpl(); + break; + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Long.parseLong(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a long"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; } - return result; } private long readLongImpl() @@ -279,22 +341,32 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public float readFloat() throws JMSException { - byte wireType = readAndCheckType(); + int position = _data.position(); + byte wireType = readWireType(); float result; - switch (wireType) - { - case FLOAT_TYPE: - checkAvailable(4); - result = readFloatImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Float.parseFloat(readStringImpl()); - break; - default: - throw new MessageFormatException("Unable to convert " + wireType + " to a float"); + try + { + switch (wireType) + { + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Float.parseFloat(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a float"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; } - return result; } private float readFloatImpl() @@ -304,26 +376,36 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public double readDouble() throws JMSException { - byte wireType = readAndCheckType(); + int position = _data.position(); + byte wireType = readWireType(); double result; - switch (wireType) - { - case DOUBLE_TYPE: - checkAvailable(8); - result = readDoubleImpl(); - break; - case FLOAT_TYPE: - checkAvailable(4); - result = readFloatImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Double.parseDouble(readStringImpl()); - break; - default: - throw new MessageFormatException("Unable to convert " + wireType + " to a double"); + try + { + switch (wireType) + { + case DOUBLE_TYPE: + checkAvailable(8); + result = readDoubleImpl(); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Double.parseDouble(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a double"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; } - return result; } private double readDoubleImpl() @@ -333,50 +415,63 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public String readString() throws JMSException { - byte wireType = readAndCheckType(); + int position = _data.position(); + byte wireType = readWireType(); String result; - switch (wireType) - { - case STRING_TYPE: - checkAvailable(1); - result = readStringImpl(); - break; - case BOOLEAN_TYPE: - checkAvailable(1); - result = String.valueOf(readBooleanImpl()); - break; - case LONG_TYPE: - checkAvailable(8); - result = String.valueOf(readLongImpl()); - break; - case INT_TYPE: - checkAvailable(4); - result = String.valueOf(readIntImpl()); - break; - case SHORT_TYPE: - checkAvailable(2); - result = String.valueOf(readShortImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = String.valueOf(readByteImpl()); - break; - case FLOAT_TYPE: - checkAvailable(4); - result = String.valueOf(readFloatImpl()); - break; - case DOUBLE_TYPE: - checkAvailable(8); - result = String.valueOf(readDoubleImpl()); - break; - case CHAR_TYPE: - checkAvailable(2); - result = String.valueOf(readCharImpl()); - break; - default: - throw new MessageFormatException("Unable to convert " + wireType + " to a String"); + try + { + switch (wireType) + { + case STRING_TYPE: + checkAvailable(1); + result = readStringImpl(); + break; + case NULL_STRING_TYPE: + result = null; + break; + case BOOLEAN_TYPE: + checkAvailable(1); + result = String.valueOf(readBooleanImpl()); + break; + case LONG_TYPE: + checkAvailable(8); + result = String.valueOf(readLongImpl()); + break; + case INT_TYPE: + checkAvailable(4); + result = String.valueOf(readIntImpl()); + break; + case SHORT_TYPE: + checkAvailable(2); + result = String.valueOf(readShortImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = String.valueOf(readByteImpl()); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = String.valueOf(readFloatImpl()); + break; + case DOUBLE_TYPE: + checkAvailable(8); + result = String.valueOf(readDoubleImpl()); + break; + case CHAR_TYPE: + checkAvailable(2); + result = String.valueOf(readCharImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a String"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; } - return result; } private String readStringImpl() throws JMSException @@ -406,7 +501,7 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess // type discriminator checked separately so you get a MessageFormatException rather than // an EOF even in the case where both would be applicable checkAvailable(1); - byte wireType = readAndCheckType(); + byte wireType = readWireType(); if (wireType != BYTEARRAY_TYPE) { throw new MessageFormatException("Unable to convert " + wireType + " to a byte array"); @@ -431,18 +526,25 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess } } } + else if (_byteArrayRemaining == 0) + { + _byteArrayRemaining = -1; + return -1; + } - return readBytesImpl(bytes); + int returnedSize = readBytesImpl(bytes); + if (returnedSize < bytes.length) + { + _byteArrayRemaining = -1; + } + return returnedSize; } private int readBytesImpl(byte[] bytes) { int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining); _byteArrayRemaining -= count; - if (_byteArrayRemaining == 0) - { - _byteArrayRemaining = -1; - } + if (count == 0) { return 0; @@ -456,62 +558,74 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public Object readObject() throws JMSException { - byte wireType = readAndCheckType(); + int position = _data.position(); + byte wireType = readWireType(); Object result = null; - switch (wireType) - { - case BOOLEAN_TYPE: - checkAvailable(1); - result = readBooleanImpl(); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - case BYTEARRAY_TYPE: - checkAvailable(4); - int size = _data.getInt(); - if (size == -1) - { + try + { + switch (wireType) + { + case BOOLEAN_TYPE: + checkAvailable(1); + result = readBooleanImpl(); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + case BYTEARRAY_TYPE: + checkAvailable(4); + int size = _data.getInt(); + if (size == -1) + { + result = null; + } + else + { + _byteArrayRemaining = size; + result = new byte[size]; + readBytesImpl(new byte[size]); + } + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case CHAR_TYPE: + checkAvailable(2); + result = readCharImpl(); + break; + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case LONG_TYPE: + checkAvailable(8); + result = readLongImpl(); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case DOUBLE_TYPE: + checkAvailable(8); + result = readDoubleImpl(); + break; + case NULL_STRING_TYPE: result = null; - } - else - { - _byteArrayRemaining = size; - result = new byte[size]; - readBytesImpl(new byte[size]); - } - break; - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case CHAR_TYPE: - checkAvailable(2); - result = readCharImpl(); - break; - case INT_TYPE: - checkAvailable(4); - result = readIntImpl(); - break; - case LONG_TYPE: - checkAvailable(8); - result = readLongImpl(); - break; - case FLOAT_TYPE: - checkAvailable(4); - result = readFloatImpl(); - break; - case DOUBLE_TYPE: - checkAvailable(8); - result = readDoubleImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = readStringImpl(); - break; + break; + case STRING_TYPE: + checkAvailable(1); + result = readStringImpl(); + break; + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; } - return result; } public void writeBoolean(boolean b) throws JMSException @@ -564,18 +678,25 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public void writeString(String string) throws JMSException { - writeTypeDiscriminator(STRING_TYPE); - try + if (string == null) { - _data.putString(string, Charset.forName("UTF-8").newEncoder()); - // we must write the null terminator ourselves - _data.put((byte)0); + writeTypeDiscriminator(NULL_STRING_TYPE); } - catch (CharacterCodingException e) + else { - JMSException ex = new JMSException("Unable to encode string: " + e); - ex.setLinkedException(e); - throw ex; + writeTypeDiscriminator(STRING_TYPE); + try + { + _data.putString(string, Charset.forName("UTF-8").newEncoder()); + // we must write the null terminator ourselves + _data.put((byte)0); + } + catch (CharacterCodingException e) + { + JMSException ex = new JMSException("Unable to encode string: " + e); + ex.setLinkedException(e); + throw ex; + } } } @@ -601,11 +722,17 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public void writeObject(Object object) throws JMSException { checkWritable(); + Class clazz = null; if (object == null) { - throw new NullPointerException("Argument must not be null"); + // string handles the output of null values + clazz = String.class; + } + else + { + clazz = object.getClass(); } - Class clazz = object.getClass(); + if (clazz == Byte.class) { writeByte((Byte) object); |