summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java611
1 files changed, 369 insertions, 242 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
index ccb3c0bf57..04f3c5ee17 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
@@ -33,7 +33,7 @@ import java.nio.charset.Charset;
*/
public class JMSStreamMessage extends AbstractBytesMessage implements StreamMessage
{
- public static final String MIME_TYPE="jms/stream-message";
+ public static final String MIME_TYPE="jms/stream-message";
private static final byte BOOLEAN_TYPE = (byte) 1;
@@ -55,6 +55,8 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
private static final byte STRING_TYPE = (byte) 10;
+ private static final byte NULL_STRING_TYPE = (byte) 11;
+
/**
* This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read
* a byte array in multiple chunks, hence this is used to track how much is left to be read
@@ -89,7 +91,7 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
return MIME_TYPE;
}
- private byte readAndCheckType() throws MessageFormatException, MessageEOFException,
+ private byte readWireType() throws MessageFormatException, MessageEOFException,
MessageNotReadableException
{
checkReadable();
@@ -105,22 +107,32 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public boolean readBoolean() throws JMSException
{
- byte wireType = readAndCheckType();
+ int position = _data.position();
+ byte wireType = readWireType();
boolean result;
- switch (wireType)
- {
- case BOOLEAN_TYPE:
- checkAvailable(1);
- result = readBooleanImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Boolean.parseBoolean(readStringImpl());
- break;
- default:
- throw new MessageFormatException("Unable to convert " + wireType + " to a boolean");
+ try
+ {
+ switch (wireType)
+ {
+ case BOOLEAN_TYPE:
+ checkAvailable(1);
+ result = readBooleanImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Boolean.parseBoolean(readStringImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a boolean");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
}
- return result;
}
private boolean readBooleanImpl()
@@ -130,20 +142,30 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public byte readByte() throws JMSException
{
- byte wireType = readAndCheckType();
+ int position = _data.position();
+ byte wireType = readWireType();
byte result;
- switch (wireType)
- {
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Byte.parseByte(readStringImpl());
- break;
- default:
- throw new MessageFormatException("Unable to convert " + wireType + " to a byte");
+ try
+ {
+ switch (wireType)
+ {
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Byte.parseByte(readStringImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a byte");
+ }
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
}
return result;
}
@@ -155,24 +177,34 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public short readShort() throws JMSException
{
- byte wireType = readAndCheckType();
+ int position = _data.position();
+ byte wireType = readWireType();
short result;
- switch (wireType)
+ try
{
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Short.parseShort(readStringImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- default:
- throw new MessageFormatException("Unable to convert " + wireType + " to a short");
+ switch (wireType)
+ {
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Short.parseShort(readStringImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a short");
+ }
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
}
return result;
}
@@ -190,15 +222,25 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
*/
public char readChar() throws JMSException
{
- byte wireType = readAndCheckType();
- if (wireType != CHAR_TYPE)
+ int position = _data.position();
+ byte wireType = readWireType();
+ try
{
- throw new MessageFormatException("Unable to convert " + wireType + " to a char");
+ if (wireType != CHAR_TYPE)
+ {
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a char");
+ }
+ else
+ {
+ checkAvailable(2);
+ return readCharImpl();
+ }
}
- else
+ catch (RuntimeException e)
{
- checkAvailable(2);
- return readCharImpl();
+ _data.position(position);
+ throw e;
}
}
@@ -209,30 +251,40 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public int readInt() throws JMSException
{
- byte wireType = readAndCheckType();
+ int position = _data.position();
+ byte wireType = readWireType();
int result;
- switch (wireType)
+ try
{
- case INT_TYPE:
- checkAvailable(4);
- result = readIntImpl();
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Integer.parseInt(readStringImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- default:
- throw new MessageFormatException("Unable to convert " + wireType + " to an int");
+ switch (wireType)
+ {
+ case INT_TYPE:
+ checkAvailable(4);
+ result = readIntImpl();
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Integer.parseInt(readStringImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to an int");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
}
- return result;
}
private int readIntImpl()
@@ -242,34 +294,44 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public long readLong() throws JMSException
{
- byte wireType = readAndCheckType();
+ int position = _data.position();
+ byte wireType = readWireType();
long result;
- switch (wireType)
- {
- case LONG_TYPE:
- checkAvailable(8);
- result = readLongImpl();
- break;
- case INT_TYPE:
- checkAvailable(4);
- result = readIntImpl();
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Long.parseLong(readStringImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- default:
- throw new MessageFormatException("Unable to convert " + wireType + " to a long");
+ try
+ {
+ switch (wireType)
+ {
+ case LONG_TYPE:
+ checkAvailable(8);
+ result = readLongImpl();
+ break;
+ case INT_TYPE:
+ checkAvailable(4);
+ result = readIntImpl();
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Long.parseLong(readStringImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a long");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
}
- return result;
}
private long readLongImpl()
@@ -279,22 +341,32 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public float readFloat() throws JMSException
{
- byte wireType = readAndCheckType();
+ int position = _data.position();
+ byte wireType = readWireType();
float result;
- switch (wireType)
- {
- case FLOAT_TYPE:
- checkAvailable(4);
- result = readFloatImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Float.parseFloat(readStringImpl());
- break;
- default:
- throw new MessageFormatException("Unable to convert " + wireType + " to a float");
+ try
+ {
+ switch (wireType)
+ {
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = readFloatImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Float.parseFloat(readStringImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a float");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
}
- return result;
}
private float readFloatImpl()
@@ -304,26 +376,36 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public double readDouble() throws JMSException
{
- byte wireType = readAndCheckType();
+ int position = _data.position();
+ byte wireType = readWireType();
double result;
- switch (wireType)
- {
- case DOUBLE_TYPE:
- checkAvailable(8);
- result = readDoubleImpl();
- break;
- case FLOAT_TYPE:
- checkAvailable(4);
- result = readFloatImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Double.parseDouble(readStringImpl());
- break;
- default:
- throw new MessageFormatException("Unable to convert " + wireType + " to a double");
+ try
+ {
+ switch (wireType)
+ {
+ case DOUBLE_TYPE:
+ checkAvailable(8);
+ result = readDoubleImpl();
+ break;
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = readFloatImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Double.parseDouble(readStringImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a double");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
}
- return result;
}
private double readDoubleImpl()
@@ -333,50 +415,63 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public String readString() throws JMSException
{
- byte wireType = readAndCheckType();
+ int position = _data.position();
+ byte wireType = readWireType();
String result;
- switch (wireType)
- {
- case STRING_TYPE:
- checkAvailable(1);
- result = readStringImpl();
- break;
- case BOOLEAN_TYPE:
- checkAvailable(1);
- result = String.valueOf(readBooleanImpl());
- break;
- case LONG_TYPE:
- checkAvailable(8);
- result = String.valueOf(readLongImpl());
- break;
- case INT_TYPE:
- checkAvailable(4);
- result = String.valueOf(readIntImpl());
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = String.valueOf(readShortImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = String.valueOf(readByteImpl());
- break;
- case FLOAT_TYPE:
- checkAvailable(4);
- result = String.valueOf(readFloatImpl());
- break;
- case DOUBLE_TYPE:
- checkAvailable(8);
- result = String.valueOf(readDoubleImpl());
- break;
- case CHAR_TYPE:
- checkAvailable(2);
- result = String.valueOf(readCharImpl());
- break;
- default:
- throw new MessageFormatException("Unable to convert " + wireType + " to a String");
+ try
+ {
+ switch (wireType)
+ {
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = readStringImpl();
+ break;
+ case NULL_STRING_TYPE:
+ result = null;
+ break;
+ case BOOLEAN_TYPE:
+ checkAvailable(1);
+ result = String.valueOf(readBooleanImpl());
+ break;
+ case LONG_TYPE:
+ checkAvailable(8);
+ result = String.valueOf(readLongImpl());
+ break;
+ case INT_TYPE:
+ checkAvailable(4);
+ result = String.valueOf(readIntImpl());
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = String.valueOf(readShortImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = String.valueOf(readByteImpl());
+ break;
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = String.valueOf(readFloatImpl());
+ break;
+ case DOUBLE_TYPE:
+ checkAvailable(8);
+ result = String.valueOf(readDoubleImpl());
+ break;
+ case CHAR_TYPE:
+ checkAvailable(2);
+ result = String.valueOf(readCharImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a String");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
}
- return result;
}
private String readStringImpl() throws JMSException
@@ -406,7 +501,7 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
// type discriminator checked separately so you get a MessageFormatException rather than
// an EOF even in the case where both would be applicable
checkAvailable(1);
- byte wireType = readAndCheckType();
+ byte wireType = readWireType();
if (wireType != BYTEARRAY_TYPE)
{
throw new MessageFormatException("Unable to convert " + wireType + " to a byte array");
@@ -431,18 +526,25 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
}
}
}
+ else if (_byteArrayRemaining == 0)
+ {
+ _byteArrayRemaining = -1;
+ return -1;
+ }
- return readBytesImpl(bytes);
+ int returnedSize = readBytesImpl(bytes);
+ if (returnedSize < bytes.length)
+ {
+ _byteArrayRemaining = -1;
+ }
+ return returnedSize;
}
private int readBytesImpl(byte[] bytes)
{
int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining);
_byteArrayRemaining -= count;
- if (_byteArrayRemaining == 0)
- {
- _byteArrayRemaining = -1;
- }
+
if (count == 0)
{
return 0;
@@ -456,62 +558,74 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public Object readObject() throws JMSException
{
- byte wireType = readAndCheckType();
+ int position = _data.position();
+ byte wireType = readWireType();
Object result = null;
- switch (wireType)
- {
- case BOOLEAN_TYPE:
- checkAvailable(1);
- result = readBooleanImpl();
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- case BYTEARRAY_TYPE:
- checkAvailable(4);
- int size = _data.getInt();
- if (size == -1)
- {
+ try
+ {
+ switch (wireType)
+ {
+ case BOOLEAN_TYPE:
+ checkAvailable(1);
+ result = readBooleanImpl();
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ case BYTEARRAY_TYPE:
+ checkAvailable(4);
+ int size = _data.getInt();
+ if (size == -1)
+ {
+ result = null;
+ }
+ else
+ {
+ _byteArrayRemaining = size;
+ result = new byte[size];
+ readBytesImpl(new byte[size]);
+ }
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case CHAR_TYPE:
+ checkAvailable(2);
+ result = readCharImpl();
+ break;
+ case INT_TYPE:
+ checkAvailable(4);
+ result = readIntImpl();
+ break;
+ case LONG_TYPE:
+ checkAvailable(8);
+ result = readLongImpl();
+ break;
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = readFloatImpl();
+ break;
+ case DOUBLE_TYPE:
+ checkAvailable(8);
+ result = readDoubleImpl();
+ break;
+ case NULL_STRING_TYPE:
result = null;
- }
- else
- {
- _byteArrayRemaining = size;
- result = new byte[size];
- readBytesImpl(new byte[size]);
- }
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case CHAR_TYPE:
- checkAvailable(2);
- result = readCharImpl();
- break;
- case INT_TYPE:
- checkAvailable(4);
- result = readIntImpl();
- break;
- case LONG_TYPE:
- checkAvailable(8);
- result = readLongImpl();
- break;
- case FLOAT_TYPE:
- checkAvailable(4);
- result = readFloatImpl();
- break;
- case DOUBLE_TYPE:
- checkAvailable(8);
- result = readDoubleImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = readStringImpl();
- break;
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = readStringImpl();
+ break;
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
}
- return result;
}
public void writeBoolean(boolean b) throws JMSException
@@ -564,18 +678,25 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public void writeString(String string) throws JMSException
{
- writeTypeDiscriminator(STRING_TYPE);
- try
+ if (string == null)
{
- _data.putString(string, Charset.forName("UTF-8").newEncoder());
- // we must write the null terminator ourselves
- _data.put((byte)0);
+ writeTypeDiscriminator(NULL_STRING_TYPE);
}
- catch (CharacterCodingException e)
+ else
{
- JMSException ex = new JMSException("Unable to encode string: " + e);
- ex.setLinkedException(e);
- throw ex;
+ writeTypeDiscriminator(STRING_TYPE);
+ try
+ {
+ _data.putString(string, Charset.forName("UTF-8").newEncoder());
+ // we must write the null terminator ourselves
+ _data.put((byte)0);
+ }
+ catch (CharacterCodingException e)
+ {
+ JMSException ex = new JMSException("Unable to encode string: " + e);
+ ex.setLinkedException(e);
+ throw ex;
+ }
}
}
@@ -601,11 +722,17 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public void writeObject(Object object) throws JMSException
{
checkWritable();
+ Class clazz = null;
if (object == null)
{
- throw new NullPointerException("Argument must not be null");
+ // string handles the output of null values
+ clazz = String.class;
+ }
+ else
+ {
+ clazz = object.getClass();
}
- Class clazz = object.getClass();
+
if (clazz == Byte.class)
{
writeByte((Byte) object);