summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java760
1 files changed, 724 insertions, 36 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
index ddeb62fbf6..85818dcd2b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
@@ -21,96 +21,784 @@
package org.apache.qpid.client.message;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
-import java.nio.ByteBuffer;
import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
import javax.jms.MessageNotReadableException;
import javax.jms.MessageNotWriteableException;
+import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.transport.util.Functions;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
/**
* @author Apache Software Foundation
*/
-public abstract class AbstractBytesTypedMessage extends AbstractJMSMessage
+public abstract class AbstractBytesTypedMessage extends AbstractBytesMessage
{
- protected boolean _readableMessage = false;
- AbstractBytesTypedMessage(AMQMessageDelegateFactory delegateFactory, boolean fromReceivedMessage)
+ protected static final byte BOOLEAN_TYPE = (byte) 1;
+
+ protected static final byte BYTE_TYPE = (byte) 2;
+
+ protected static final byte BYTEARRAY_TYPE = (byte) 3;
+
+ protected static final byte SHORT_TYPE = (byte) 4;
+
+ protected static final byte CHAR_TYPE = (byte) 5;
+
+ protected static final byte INT_TYPE = (byte) 6;
+
+ protected static final byte LONG_TYPE = (byte) 7;
+
+ protected static final byte FLOAT_TYPE = (byte) 8;
+
+ protected static final byte DOUBLE_TYPE = (byte) 9;
+
+ protected static final byte STRING_TYPE = (byte) 10;
+
+ protected static final byte NULL_STRING_TYPE = (byte) 11;
+
+ /**
+ * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read
+ * a byte array in multiple chunks, hence this is used to track how much is left to be read
+ */
+ private int _byteArrayRemaining = -1;
+
+ AbstractBytesTypedMessage(AMQMessageDelegateFactory delegateFactory)
+ {
+
+ this(delegateFactory, null);
+ }
+
+ /**
+ * Construct a stream message with existing data.
+ *
+ * @param delegateFactory
+ * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
+ */
+ AbstractBytesTypedMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
{
- super(delegateFactory, fromReceivedMessage); // this instanties a content header
- _readableMessage = fromReceivedMessage;
+ super(delegateFactory, data); // this instanties a content header
}
- AbstractBytesTypedMessage(AMQMessageDelegate delegate, boolean fromReceivedMessage) throws AMQException
+ AbstractBytesTypedMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
{
- super(delegate, fromReceivedMessage);
- _readableMessage = fromReceivedMessage;
+ super(delegate, data);
+ }
+
+ protected byte readWireType() throws MessageFormatException, MessageEOFException,
+ MessageNotReadableException
+ {
+ checkReadable();
+ checkAvailable(1);
+ return _data.get();
}
- protected void checkReadable() throws MessageNotReadableException
+ protected void writeTypeDiscriminator(byte type) throws MessageNotWriteableException
{
- if (!_readableMessage)
+ checkWritable();
+ _data.put(type);
+ _changedData = true;
+ }
+
+ protected boolean readBoolean() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ boolean result;
+ try
{
- throw new MessageNotReadableException("You need to call reset() to make the message readable");
+ switch (wireType)
+ {
+ case BOOLEAN_TYPE:
+ checkAvailable(1);
+ result = readBooleanImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Boolean.parseBoolean(readStringImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a boolean");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
}
}
- @Override
- protected void checkWritable() throws MessageNotWriteableException
+ private boolean readBooleanImpl()
{
- super.checkWritable();
- if(_readableMessage)
+ return _data.get() != 0;
+ }
+
+ protected byte readByte() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ byte result;
+ try
{
- throw new MessageNotWriteableException("You need to call clearBody() to make the message writable");
+ switch (wireType)
+ {
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Byte.parseByte(readStringImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a byte");
+ }
}
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ return result;
}
- public void clearBody() throws JMSException
+ private byte readByteImpl()
{
- super.clearBody();
- _readableMessage = false;
+ return _data.get();
}
+ protected short readShort() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ short result;
+ try
+ {
+ switch (wireType)
+ {
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Short.parseShort(readStringImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a short");
+ }
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ return result;
+ }
- public String toBodyString() throws JMSException
+ private short readShortImpl()
{
+ return _data.getShort();
+ }
+
+ /**
+ * Note that this method reads a unicode character as two bytes from the stream
+ *
+ * @return the character read from the stream
+ * @throws javax.jms.JMSException
+ */
+ protected char readChar() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
try
{
- ByteBuffer data = getData();
- if (data != null)
- {
- return Functions.str(data, 100, 0);
- }
- else
- {
- return "";
+ if(wireType == NULL_STRING_TYPE){
+ throw new NullPointerException();
}
+ if (wireType != CHAR_TYPE)
+ {
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a char");
+ }
+ else
+ {
+ checkAvailable(2);
+ return readCharImpl();
+ }
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ private char readCharImpl()
+ {
+ return _data.getChar();
+ }
+
+ protected int readInt() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ int result;
+ try
+ {
+ switch (wireType)
+ {
+ case INT_TYPE:
+ checkAvailable(4);
+ result = readIntImpl();
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Integer.parseInt(readStringImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to an int");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ protected int readIntImpl()
+ {
+ return _data.getInt();
+ }
+
+ protected long readLong() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ long result;
+ try
+ {
+ switch (wireType)
+ {
+ case LONG_TYPE:
+ checkAvailable(8);
+ result = readLongImpl();
+ break;
+ case INT_TYPE:
+ checkAvailable(4);
+ result = readIntImpl();
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Long.parseLong(readStringImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a long");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ private long readLongImpl()
+ {
+ return _data.getLong();
+ }
+
+ protected float readFloat() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ float result;
+ try
+ {
+ switch (wireType)
+ {
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = readFloatImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Float.parseFloat(readStringImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a float");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ private float readFloatImpl()
+ {
+ return _data.getFloat();
+ }
+
+ protected double readDouble() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ double result;
+ try
+ {
+ switch (wireType)
+ {
+ case DOUBLE_TYPE:
+ checkAvailable(8);
+ result = readDoubleImpl();
+ break;
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = readFloatImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Double.parseDouble(readStringImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a double");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ private double readDoubleImpl()
+ {
+ return _data.getDouble();
+ }
+
+ protected String readString() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ String result;
+ try
+ {
+ switch (wireType)
+ {
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = readStringImpl();
+ break;
+ case NULL_STRING_TYPE:
+ result = null;
+ throw new NullPointerException("data is null");
+ case BOOLEAN_TYPE:
+ checkAvailable(1);
+ result = String.valueOf(readBooleanImpl());
+ break;
+ case LONG_TYPE:
+ checkAvailable(8);
+ result = String.valueOf(readLongImpl());
+ break;
+ case INT_TYPE:
+ checkAvailable(4);
+ result = String.valueOf(readIntImpl());
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = String.valueOf(readShortImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = String.valueOf(readByteImpl());
+ break;
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = String.valueOf(readFloatImpl());
+ break;
+ case DOUBLE_TYPE:
+ checkAvailable(8);
+ result = String.valueOf(readDoubleImpl());
+ break;
+ case CHAR_TYPE:
+ checkAvailable(2);
+ result = String.valueOf(readCharImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a String");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ protected String readStringImpl() throws JMSException
+ {
+ try
+ {
+ return _data.getString(Charset.forName("UTF-8").newDecoder());
}
- catch (Exception e)
+ catch (CharacterCodingException e)
{
- JMSException jmse = new JMSException(e.toString());
+ JMSException jmse = new JMSException("Error decoding byte stream as a UTF8 string: " + e);
jmse.setLinkedException(e);
jmse.initCause(e);
throw jmse;
}
+ }
+
+ protected int readBytes(byte[] bytes) throws JMSException
+ {
+ if (bytes == null)
+ {
+ throw new IllegalArgumentException("byte array must not be null");
+ }
+ checkReadable();
+ // first call
+ if (_byteArrayRemaining == -1)
+ {
+ // type discriminator checked separately so you get a MessageFormatException rather than
+ // an EOF even in the case where both would be applicable
+ checkAvailable(1);
+ byte wireType = readWireType();
+ if (wireType != BYTEARRAY_TYPE)
+ {
+ throw new MessageFormatException("Unable to convert " + wireType + " to a byte array");
+ }
+ checkAvailable(4);
+ int size = _data.getInt();
+ // length of -1 indicates null
+ if (size == -1)
+ {
+ return -1;
+ }
+ else
+ {
+ if (size > _data.remaining())
+ {
+ throw new MessageEOFException("Byte array has stated length " + size + " but message only contains " +
+ _data.remaining() + " bytes");
+ }
+ else
+ {
+ _byteArrayRemaining = size;
+ }
+ }
+ }
+ else if (_byteArrayRemaining == 0)
+ {
+ _byteArrayRemaining = -1;
+ return -1;
+ }
+
+ int returnedSize = readBytesImpl(bytes);
+ if (returnedSize < bytes.length)
+ {
+ _byteArrayRemaining = -1;
+ }
+ return returnedSize;
+ }
+
+ private int readBytesImpl(byte[] bytes)
+ {
+ int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining);
+ _byteArrayRemaining -= count;
+
+ if (count == 0)
+ {
+ return 0;
+ }
+ else
+ {
+ _data.get(bytes, 0, count);
+ return count;
+ }
+ }
+
+ protected Object readObject() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ Object result = null;
+ try
+ {
+ switch (wireType)
+ {
+ case BOOLEAN_TYPE:
+ checkAvailable(1);
+ result = readBooleanImpl();
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ case BYTEARRAY_TYPE:
+ checkAvailable(4);
+ int size = _data.getInt();
+ if (size == -1)
+ {
+ result = null;
+ }
+ else
+ {
+ _byteArrayRemaining = size;
+ byte[] bytesResult = new byte[size];
+ readBytesImpl(bytesResult);
+ result = bytesResult;
+ }
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case CHAR_TYPE:
+ checkAvailable(2);
+ result = readCharImpl();
+ break;
+ case INT_TYPE:
+ checkAvailable(4);
+ result = readIntImpl();
+ break;
+ case LONG_TYPE:
+ checkAvailable(8);
+ result = readLongImpl();
+ break;
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = readFloatImpl();
+ break;
+ case DOUBLE_TYPE:
+ checkAvailable(8);
+ result = readDoubleImpl();
+ break;
+ case NULL_STRING_TYPE:
+ result = null;
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = readStringImpl();
+ break;
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ protected void writeBoolean(boolean b) throws JMSException
+ {
+ writeTypeDiscriminator(BOOLEAN_TYPE);
+ _data.put(b ? (byte) 1 : (byte) 0);
+ }
+
+ protected void writeByte(byte b) throws JMSException
+ {
+ writeTypeDiscriminator(BYTE_TYPE);
+ _data.put(b);
+ }
+
+ protected void writeShort(short i) throws JMSException
+ {
+ writeTypeDiscriminator(SHORT_TYPE);
+ _data.putShort(i);
+ }
+
+ protected void writeChar(char c) throws JMSException
+ {
+ writeTypeDiscriminator(CHAR_TYPE);
+ _data.putChar(c);
+ }
+
+ protected void writeInt(int i) throws JMSException
+ {
+ writeTypeDiscriminator(INT_TYPE);
+ writeIntImpl(i);
+ }
+
+ protected void writeIntImpl(int i)
+ {
+ _data.putInt(i);
+ }
+
+ protected void writeLong(long l) throws JMSException
+ {
+ writeTypeDiscriminator(LONG_TYPE);
+ _data.putLong(l);
+ }
+ protected void writeFloat(float v) throws JMSException
+ {
+ writeTypeDiscriminator(FLOAT_TYPE);
+ _data.putFloat(v);
}
+ protected void writeDouble(double v) throws JMSException
+ {
+ writeTypeDiscriminator(DOUBLE_TYPE);
+ _data.putDouble(v);
+ }
- abstract public void reset();
+ protected void writeString(String string) throws JMSException
+ {
+ if (string == null)
+ {
+ writeTypeDiscriminator(NULL_STRING_TYPE);
+ }
+ else
+ {
+ writeTypeDiscriminator(STRING_TYPE);
+ try
+ {
+ writeStringImpl(string);
+ }
+ catch (CharacterCodingException e)
+ {
+ JMSException jmse = new JMSException("Unable to encode string: " + e);
+ jmse.setLinkedException(e);
+ jmse.initCause(e);
+ throw jmse;
+ }
+ }
+ }
+
+ protected void writeStringImpl(String string)
+ throws CharacterCodingException
+ {
+ _data.putString(string, Charset.forName("UTF-8").newEncoder());
+ // we must write the null terminator ourselves
+ _data.put((byte) 0);
+ }
+ protected void writeBytes(byte[] bytes) throws JMSException
+ {
+ writeBytes(bytes, 0, bytes == null ? 0 : bytes.length);
+ }
+ protected void writeBytes(byte[] bytes, int offset, int length) throws JMSException
+ {
+ writeTypeDiscriminator(BYTEARRAY_TYPE);
+ if (bytes == null)
+ {
+ _data.putInt(-1);
+ }
+ else
+ {
+ _data.putInt(length);
+ _data.put(bytes, offset, length);
+ }
+ }
+ protected void writeObject(Object object) throws JMSException
+ {
+ checkWritable();
+ Class clazz;
+ if (object == null)
+ {
+ // string handles the output of null values
+ clazz = String.class;
+ }
+ else
+ {
+ clazz = object.getClass();
+ }
+
+ if (clazz == Byte.class)
+ {
+ writeByte((Byte) object);
+ }
+ else if (clazz == Boolean.class)
+ {
+ writeBoolean((Boolean) object);
+ }
+ else if (clazz == byte[].class)
+ {
+ writeBytes((byte[]) object);
+ }
+ else if (clazz == Short.class)
+ {
+ writeShort((Short) object);
+ }
+ else if (clazz == Character.class)
+ {
+ writeChar((Character) object);
+ }
+ else if (clazz == Integer.class)
+ {
+ writeInt((Integer) object);
+ }
+ else if (clazz == Long.class)
+ {
+ writeLong((Long) object);
+ }
+ else if (clazz == Float.class)
+ {
+ writeFloat((Float) object);
+ }
+ else if (clazz == Double.class)
+ {
+ writeDouble((Double) object);
+ }
+ else if (clazz == String.class)
+ {
+ writeString((String) object);
+ }
+ else
+ {
+ throw new MessageFormatException("Only primitives plus byte arrays and String are valid types");
+ }
+ }
}