diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java | 760 |
1 files changed, 724 insertions, 36 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java index ddeb62fbf6..85818dcd2b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java @@ -21,96 +21,784 @@ package org.apache.qpid.client.message; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; -import java.nio.ByteBuffer; import javax.jms.JMSException; +import javax.jms.MessageEOFException; +import javax.jms.MessageFormatException; import javax.jms.MessageNotReadableException; import javax.jms.MessageNotWriteableException; +import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; -import org.apache.qpid.transport.util.Functions; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; /** * @author Apache Software Foundation */ -public abstract class AbstractBytesTypedMessage extends AbstractJMSMessage +public abstract class AbstractBytesTypedMessage extends AbstractBytesMessage { - protected boolean _readableMessage = false; - AbstractBytesTypedMessage(AMQMessageDelegateFactory delegateFactory, boolean fromReceivedMessage) + protected static final byte BOOLEAN_TYPE = (byte) 1; + + protected static final byte BYTE_TYPE = (byte) 2; + + protected static final byte BYTEARRAY_TYPE = (byte) 3; + + protected static final byte SHORT_TYPE = (byte) 4; + + protected static final byte CHAR_TYPE = (byte) 5; + + protected static final byte INT_TYPE = (byte) 6; + + protected static final byte LONG_TYPE = (byte) 7; + + protected static final byte FLOAT_TYPE = (byte) 8; + + protected static final byte DOUBLE_TYPE = (byte) 9; + + protected static final byte STRING_TYPE = (byte) 10; + + protected static final byte NULL_STRING_TYPE = (byte) 11; + + /** + * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read + * a byte array in multiple chunks, hence this is used to track how much is left to be read + */ + private int _byteArrayRemaining = -1; + + AbstractBytesTypedMessage(AMQMessageDelegateFactory delegateFactory) + { + + this(delegateFactory, null); + } + + /** + * Construct a stream message with existing data. + * + * @param delegateFactory + * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is + */ + AbstractBytesTypedMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) { - super(delegateFactory, fromReceivedMessage); // this instanties a content header - _readableMessage = fromReceivedMessage; + super(delegateFactory, data); // this instanties a content header } - AbstractBytesTypedMessage(AMQMessageDelegate delegate, boolean fromReceivedMessage) throws AMQException + AbstractBytesTypedMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException { - super(delegate, fromReceivedMessage); - _readableMessage = fromReceivedMessage; + super(delegate, data); + } + + protected byte readWireType() throws MessageFormatException, MessageEOFException, + MessageNotReadableException + { + checkReadable(); + checkAvailable(1); + return _data.get(); } - protected void checkReadable() throws MessageNotReadableException + protected void writeTypeDiscriminator(byte type) throws MessageNotWriteableException { - if (!_readableMessage) + checkWritable(); + _data.put(type); + _changedData = true; + } + + protected boolean readBoolean() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + boolean result; + try { - throw new MessageNotReadableException("You need to call reset() to make the message readable"); + switch (wireType) + { + case BOOLEAN_TYPE: + checkAvailable(1); + result = readBooleanImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Boolean.parseBoolean(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a boolean"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; } } - @Override - protected void checkWritable() throws MessageNotWriteableException + private boolean readBooleanImpl() { - super.checkWritable(); - if(_readableMessage) + return _data.get() != 0; + } + + protected byte readByte() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + byte result; + try { - throw new MessageNotWriteableException("You need to call clearBody() to make the message writable"); + switch (wireType) + { + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Byte.parseByte(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a byte"); + } } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + return result; } - public void clearBody() throws JMSException + private byte readByteImpl() { - super.clearBody(); - _readableMessage = false; + return _data.get(); } + protected short readShort() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + short result; + try + { + switch (wireType) + { + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Short.parseShort(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a short"); + } + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + return result; + } - public String toBodyString() throws JMSException + private short readShortImpl() { + return _data.getShort(); + } + + /** + * Note that this method reads a unicode character as two bytes from the stream + * + * @return the character read from the stream + * @throws javax.jms.JMSException + */ + protected char readChar() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); try { - ByteBuffer data = getData(); - if (data != null) - { - return Functions.str(data, 100, 0); - } - else - { - return ""; + if(wireType == NULL_STRING_TYPE){ + throw new NullPointerException(); } + if (wireType != CHAR_TYPE) + { + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a char"); + } + else + { + checkAvailable(2); + return readCharImpl(); + } + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + private char readCharImpl() + { + return _data.getChar(); + } + + protected int readInt() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + int result; + try + { + switch (wireType) + { + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Integer.parseInt(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to an int"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + protected int readIntImpl() + { + return _data.getInt(); + } + + protected long readLong() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + long result; + try + { + switch (wireType) + { + case LONG_TYPE: + checkAvailable(8); + result = readLongImpl(); + break; + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Long.parseLong(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a long"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + private long readLongImpl() + { + return _data.getLong(); + } + + protected float readFloat() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + float result; + try + { + switch (wireType) + { + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Float.parseFloat(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a float"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + private float readFloatImpl() + { + return _data.getFloat(); + } + + protected double readDouble() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + double result; + try + { + switch (wireType) + { + case DOUBLE_TYPE: + checkAvailable(8); + result = readDoubleImpl(); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Double.parseDouble(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a double"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + private double readDoubleImpl() + { + return _data.getDouble(); + } + + protected String readString() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + String result; + try + { + switch (wireType) + { + case STRING_TYPE: + checkAvailable(1); + result = readStringImpl(); + break; + case NULL_STRING_TYPE: + result = null; + throw new NullPointerException("data is null"); + case BOOLEAN_TYPE: + checkAvailable(1); + result = String.valueOf(readBooleanImpl()); + break; + case LONG_TYPE: + checkAvailable(8); + result = String.valueOf(readLongImpl()); + break; + case INT_TYPE: + checkAvailable(4); + result = String.valueOf(readIntImpl()); + break; + case SHORT_TYPE: + checkAvailable(2); + result = String.valueOf(readShortImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = String.valueOf(readByteImpl()); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = String.valueOf(readFloatImpl()); + break; + case DOUBLE_TYPE: + checkAvailable(8); + result = String.valueOf(readDoubleImpl()); + break; + case CHAR_TYPE: + checkAvailable(2); + result = String.valueOf(readCharImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a String"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + protected String readStringImpl() throws JMSException + { + try + { + return _data.getString(Charset.forName("UTF-8").newDecoder()); } - catch (Exception e) + catch (CharacterCodingException e) { - JMSException jmse = new JMSException(e.toString()); + JMSException jmse = new JMSException("Error decoding byte stream as a UTF8 string: " + e); jmse.setLinkedException(e); jmse.initCause(e); throw jmse; } + } + + protected int readBytes(byte[] bytes) throws JMSException + { + if (bytes == null) + { + throw new IllegalArgumentException("byte array must not be null"); + } + checkReadable(); + // first call + if (_byteArrayRemaining == -1) + { + // type discriminator checked separately so you get a MessageFormatException rather than + // an EOF even in the case where both would be applicable + checkAvailable(1); + byte wireType = readWireType(); + if (wireType != BYTEARRAY_TYPE) + { + throw new MessageFormatException("Unable to convert " + wireType + " to a byte array"); + } + checkAvailable(4); + int size = _data.getInt(); + // length of -1 indicates null + if (size == -1) + { + return -1; + } + else + { + if (size > _data.remaining()) + { + throw new MessageEOFException("Byte array has stated length " + size + " but message only contains " + + _data.remaining() + " bytes"); + } + else + { + _byteArrayRemaining = size; + } + } + } + else if (_byteArrayRemaining == 0) + { + _byteArrayRemaining = -1; + return -1; + } + + int returnedSize = readBytesImpl(bytes); + if (returnedSize < bytes.length) + { + _byteArrayRemaining = -1; + } + return returnedSize; + } + + private int readBytesImpl(byte[] bytes) + { + int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining); + _byteArrayRemaining -= count; + + if (count == 0) + { + return 0; + } + else + { + _data.get(bytes, 0, count); + return count; + } + } + + protected Object readObject() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + Object result = null; + try + { + switch (wireType) + { + case BOOLEAN_TYPE: + checkAvailable(1); + result = readBooleanImpl(); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + case BYTEARRAY_TYPE: + checkAvailable(4); + int size = _data.getInt(); + if (size == -1) + { + result = null; + } + else + { + _byteArrayRemaining = size; + byte[] bytesResult = new byte[size]; + readBytesImpl(bytesResult); + result = bytesResult; + } + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case CHAR_TYPE: + checkAvailable(2); + result = readCharImpl(); + break; + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case LONG_TYPE: + checkAvailable(8); + result = readLongImpl(); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case DOUBLE_TYPE: + checkAvailable(8); + result = readDoubleImpl(); + break; + case NULL_STRING_TYPE: + result = null; + break; + case STRING_TYPE: + checkAvailable(1); + result = readStringImpl(); + break; + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + protected void writeBoolean(boolean b) throws JMSException + { + writeTypeDiscriminator(BOOLEAN_TYPE); + _data.put(b ? (byte) 1 : (byte) 0); + } + + protected void writeByte(byte b) throws JMSException + { + writeTypeDiscriminator(BYTE_TYPE); + _data.put(b); + } + + protected void writeShort(short i) throws JMSException + { + writeTypeDiscriminator(SHORT_TYPE); + _data.putShort(i); + } + + protected void writeChar(char c) throws JMSException + { + writeTypeDiscriminator(CHAR_TYPE); + _data.putChar(c); + } + + protected void writeInt(int i) throws JMSException + { + writeTypeDiscriminator(INT_TYPE); + writeIntImpl(i); + } + + protected void writeIntImpl(int i) + { + _data.putInt(i); + } + + protected void writeLong(long l) throws JMSException + { + writeTypeDiscriminator(LONG_TYPE); + _data.putLong(l); + } + protected void writeFloat(float v) throws JMSException + { + writeTypeDiscriminator(FLOAT_TYPE); + _data.putFloat(v); } + protected void writeDouble(double v) throws JMSException + { + writeTypeDiscriminator(DOUBLE_TYPE); + _data.putDouble(v); + } - abstract public void reset(); + protected void writeString(String string) throws JMSException + { + if (string == null) + { + writeTypeDiscriminator(NULL_STRING_TYPE); + } + else + { + writeTypeDiscriminator(STRING_TYPE); + try + { + writeStringImpl(string); + } + catch (CharacterCodingException e) + { + JMSException jmse = new JMSException("Unable to encode string: " + e); + jmse.setLinkedException(e); + jmse.initCause(e); + throw jmse; + } + } + } + + protected void writeStringImpl(String string) + throws CharacterCodingException + { + _data.putString(string, Charset.forName("UTF-8").newEncoder()); + // we must write the null terminator ourselves + _data.put((byte) 0); + } + protected void writeBytes(byte[] bytes) throws JMSException + { + writeBytes(bytes, 0, bytes == null ? 0 : bytes.length); + } + protected void writeBytes(byte[] bytes, int offset, int length) throws JMSException + { + writeTypeDiscriminator(BYTEARRAY_TYPE); + if (bytes == null) + { + _data.putInt(-1); + } + else + { + _data.putInt(length); + _data.put(bytes, offset, length); + } + } + protected void writeObject(Object object) throws JMSException + { + checkWritable(); + Class clazz; + if (object == null) + { + // string handles the output of null values + clazz = String.class; + } + else + { + clazz = object.getClass(); + } + + if (clazz == Byte.class) + { + writeByte((Byte) object); + } + else if (clazz == Boolean.class) + { + writeBoolean((Boolean) object); + } + else if (clazz == byte[].class) + { + writeBytes((byte[]) object); + } + else if (clazz == Short.class) + { + writeShort((Short) object); + } + else if (clazz == Character.class) + { + writeChar((Character) object); + } + else if (clazz == Integer.class) + { + writeInt((Integer) object); + } + else if (clazz == Long.class) + { + writeLong((Long) object); + } + else if (clazz == Float.class) + { + writeFloat((Float) object); + } + else if (clazz == Double.class) + { + writeDouble((Double) object); + } + else if (clazz == String.class) + { + writeString((String) object); + } + else + { + throw new MessageFormatException("Only primitives plus byte arrays and String are valid types"); + } + } } |