diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java | 760 |
1 files changed, 36 insertions, 724 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java index 85818dcd2b..ddeb62fbf6 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java @@ -21,784 +21,96 @@ package org.apache.qpid.client.message; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; +import java.nio.ByteBuffer; import javax.jms.JMSException; -import javax.jms.MessageEOFException; -import javax.jms.MessageFormatException; import javax.jms.MessageNotReadableException; import javax.jms.MessageNotWriteableException; -import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.transport.util.Functions; /** * @author Apache Software Foundation */ -public abstract class AbstractBytesTypedMessage extends AbstractBytesMessage +public abstract class AbstractBytesTypedMessage extends AbstractJMSMessage { + protected boolean _readableMessage = false; - protected static final byte BOOLEAN_TYPE = (byte) 1; - - protected static final byte BYTE_TYPE = (byte) 2; - - protected static final byte BYTEARRAY_TYPE = (byte) 3; - - protected static final byte SHORT_TYPE = (byte) 4; - - protected static final byte CHAR_TYPE = (byte) 5; - - protected static final byte INT_TYPE = (byte) 6; - - protected static final byte LONG_TYPE = (byte) 7; - - protected static final byte FLOAT_TYPE = (byte) 8; - - protected static final byte DOUBLE_TYPE = (byte) 9; - - protected static final byte STRING_TYPE = (byte) 10; - - protected static final byte NULL_STRING_TYPE = (byte) 11; - - /** - * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read - * a byte array in multiple chunks, hence this is used to track how much is left to be read - */ - private int _byteArrayRemaining = -1; - - AbstractBytesTypedMessage(AMQMessageDelegateFactory delegateFactory) - { - - this(delegateFactory, null); - } - - /** - * Construct a stream message with existing data. - * - * @param delegateFactory - * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is - */ - AbstractBytesTypedMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) + AbstractBytesTypedMessage(AMQMessageDelegateFactory delegateFactory, boolean fromReceivedMessage) { - super(delegateFactory, data); // this instanties a content header + super(delegateFactory, fromReceivedMessage); // this instanties a content header + _readableMessage = fromReceivedMessage; } - AbstractBytesTypedMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException + AbstractBytesTypedMessage(AMQMessageDelegate delegate, boolean fromReceivedMessage) throws AMQException { - super(delegate, data); - } + super(delegate, fromReceivedMessage); + _readableMessage = fromReceivedMessage; - - protected byte readWireType() throws MessageFormatException, MessageEOFException, - MessageNotReadableException - { - checkReadable(); - checkAvailable(1); - return _data.get(); } - protected void writeTypeDiscriminator(byte type) throws MessageNotWriteableException + protected void checkReadable() throws MessageNotReadableException { - checkWritable(); - _data.put(type); - _changedData = true; - } - - protected boolean readBoolean() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - boolean result; - try + if (!_readableMessage) { - switch (wireType) - { - case BOOLEAN_TYPE: - checkAvailable(1); - result = readBooleanImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Boolean.parseBoolean(readStringImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a boolean"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; + throw new MessageNotReadableException("You need to call reset() to make the message readable"); } } - private boolean readBooleanImpl() + @Override + protected void checkWritable() throws MessageNotWriteableException { - return _data.get() != 0; - } - - protected byte readByte() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - byte result; - try + super.checkWritable(); + if(_readableMessage) { - switch (wireType) - { - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Byte.parseByte(readStringImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a byte"); - } + throw new MessageNotWriteableException("You need to call clearBody() to make the message writable"); } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - return result; } - private byte readByteImpl() + public void clearBody() throws JMSException { - return _data.get(); + super.clearBody(); + _readableMessage = false; } - protected short readShort() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - short result; - try - { - switch (wireType) - { - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Short.parseShort(readStringImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a short"); - } - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - return result; - } - private short readShortImpl() + public String toBodyString() throws JMSException { - return _data.getShort(); - } - - /** - * Note that this method reads a unicode character as two bytes from the stream - * - * @return the character read from the stream - * @throws javax.jms.JMSException - */ - protected char readChar() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); try { - if(wireType == NULL_STRING_TYPE){ - throw new NullPointerException(); + ByteBuffer data = getData(); + if (data != null) + { + return Functions.str(data, 100, 0); + } + else + { + return ""; } - if (wireType != CHAR_TYPE) - { - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a char"); - } - else - { - checkAvailable(2); - return readCharImpl(); - } - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - private char readCharImpl() - { - return _data.getChar(); - } - - protected int readInt() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - int result; - try - { - switch (wireType) - { - case INT_TYPE: - checkAvailable(4); - result = readIntImpl(); - break; - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Integer.parseInt(readStringImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to an int"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - protected int readIntImpl() - { - return _data.getInt(); - } - - protected long readLong() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - long result; - try - { - switch (wireType) - { - case LONG_TYPE: - checkAvailable(8); - result = readLongImpl(); - break; - case INT_TYPE: - checkAvailable(4); - result = readIntImpl(); - break; - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Long.parseLong(readStringImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a long"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - private long readLongImpl() - { - return _data.getLong(); - } - - protected float readFloat() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - float result; - try - { - switch (wireType) - { - case FLOAT_TYPE: - checkAvailable(4); - result = readFloatImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Float.parseFloat(readStringImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a float"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - private float readFloatImpl() - { - return _data.getFloat(); - } - - protected double readDouble() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - double result; - try - { - switch (wireType) - { - case DOUBLE_TYPE: - checkAvailable(8); - result = readDoubleImpl(); - break; - case FLOAT_TYPE: - checkAvailable(4); - result = readFloatImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Double.parseDouble(readStringImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a double"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - private double readDoubleImpl() - { - return _data.getDouble(); - } - - protected String readString() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - String result; - try - { - switch (wireType) - { - case STRING_TYPE: - checkAvailable(1); - result = readStringImpl(); - break; - case NULL_STRING_TYPE: - result = null; - throw new NullPointerException("data is null"); - case BOOLEAN_TYPE: - checkAvailable(1); - result = String.valueOf(readBooleanImpl()); - break; - case LONG_TYPE: - checkAvailable(8); - result = String.valueOf(readLongImpl()); - break; - case INT_TYPE: - checkAvailable(4); - result = String.valueOf(readIntImpl()); - break; - case SHORT_TYPE: - checkAvailable(2); - result = String.valueOf(readShortImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = String.valueOf(readByteImpl()); - break; - case FLOAT_TYPE: - checkAvailable(4); - result = String.valueOf(readFloatImpl()); - break; - case DOUBLE_TYPE: - checkAvailable(8); - result = String.valueOf(readDoubleImpl()); - break; - case CHAR_TYPE: - checkAvailable(2); - result = String.valueOf(readCharImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a String"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - protected String readStringImpl() throws JMSException - { - try - { - return _data.getString(Charset.forName("UTF-8").newDecoder()); } - catch (CharacterCodingException e) + catch (Exception e) { - JMSException jmse = new JMSException("Error decoding byte stream as a UTF8 string: " + e); + JMSException jmse = new JMSException(e.toString()); jmse.setLinkedException(e); jmse.initCause(e); throw jmse; } - } - - protected int readBytes(byte[] bytes) throws JMSException - { - if (bytes == null) - { - throw new IllegalArgumentException("byte array must not be null"); - } - checkReadable(); - // first call - if (_byteArrayRemaining == -1) - { - // type discriminator checked separately so you get a MessageFormatException rather than - // an EOF even in the case where both would be applicable - checkAvailable(1); - byte wireType = readWireType(); - if (wireType != BYTEARRAY_TYPE) - { - throw new MessageFormatException("Unable to convert " + wireType + " to a byte array"); - } - checkAvailable(4); - int size = _data.getInt(); - // length of -1 indicates null - if (size == -1) - { - return -1; - } - else - { - if (size > _data.remaining()) - { - throw new MessageEOFException("Byte array has stated length " + size + " but message only contains " + - _data.remaining() + " bytes"); - } - else - { - _byteArrayRemaining = size; - } - } - } - else if (_byteArrayRemaining == 0) - { - _byteArrayRemaining = -1; - return -1; - } - - int returnedSize = readBytesImpl(bytes); - if (returnedSize < bytes.length) - { - _byteArrayRemaining = -1; - } - return returnedSize; - } - - private int readBytesImpl(byte[] bytes) - { - int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining); - _byteArrayRemaining -= count; - - if (count == 0) - { - return 0; - } - else - { - _data.get(bytes, 0, count); - return count; - } - } - - protected Object readObject() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - Object result = null; - try - { - switch (wireType) - { - case BOOLEAN_TYPE: - checkAvailable(1); - result = readBooleanImpl(); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - case BYTEARRAY_TYPE: - checkAvailable(4); - int size = _data.getInt(); - if (size == -1) - { - result = null; - } - else - { - _byteArrayRemaining = size; - byte[] bytesResult = new byte[size]; - readBytesImpl(bytesResult); - result = bytesResult; - } - break; - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case CHAR_TYPE: - checkAvailable(2); - result = readCharImpl(); - break; - case INT_TYPE: - checkAvailable(4); - result = readIntImpl(); - break; - case LONG_TYPE: - checkAvailable(8); - result = readLongImpl(); - break; - case FLOAT_TYPE: - checkAvailable(4); - result = readFloatImpl(); - break; - case DOUBLE_TYPE: - checkAvailable(8); - result = readDoubleImpl(); - break; - case NULL_STRING_TYPE: - result = null; - break; - case STRING_TYPE: - checkAvailable(1); - result = readStringImpl(); - break; - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - protected void writeBoolean(boolean b) throws JMSException - { - writeTypeDiscriminator(BOOLEAN_TYPE); - _data.put(b ? (byte) 1 : (byte) 0); - } - - protected void writeByte(byte b) throws JMSException - { - writeTypeDiscriminator(BYTE_TYPE); - _data.put(b); - } - - protected void writeShort(short i) throws JMSException - { - writeTypeDiscriminator(SHORT_TYPE); - _data.putShort(i); - } - - protected void writeChar(char c) throws JMSException - { - writeTypeDiscriminator(CHAR_TYPE); - _data.putChar(c); - } - - protected void writeInt(int i) throws JMSException - { - writeTypeDiscriminator(INT_TYPE); - writeIntImpl(i); - } - - protected void writeIntImpl(int i) - { - _data.putInt(i); - } - - protected void writeLong(long l) throws JMSException - { - writeTypeDiscriminator(LONG_TYPE); - _data.putLong(l); - } - protected void writeFloat(float v) throws JMSException - { - writeTypeDiscriminator(FLOAT_TYPE); - _data.putFloat(v); } - protected void writeDouble(double v) throws JMSException - { - writeTypeDiscriminator(DOUBLE_TYPE); - _data.putDouble(v); - } - protected void writeString(String string) throws JMSException - { - if (string == null) - { - writeTypeDiscriminator(NULL_STRING_TYPE); - } - else - { - writeTypeDiscriminator(STRING_TYPE); - try - { - writeStringImpl(string); - } - catch (CharacterCodingException e) - { - JMSException jmse = new JMSException("Unable to encode string: " + e); - jmse.setLinkedException(e); - jmse.initCause(e); - throw jmse; - } - } - } - - protected void writeStringImpl(String string) - throws CharacterCodingException - { - _data.putString(string, Charset.forName("UTF-8").newEncoder()); - // we must write the null terminator ourselves - _data.put((byte) 0); - } + abstract public void reset(); - protected void writeBytes(byte[] bytes) throws JMSException - { - writeBytes(bytes, 0, bytes == null ? 0 : bytes.length); - } - protected void writeBytes(byte[] bytes, int offset, int length) throws JMSException - { - writeTypeDiscriminator(BYTEARRAY_TYPE); - if (bytes == null) - { - _data.putInt(-1); - } - else - { - _data.putInt(length); - _data.put(bytes, offset, length); - } - } - protected void writeObject(Object object) throws JMSException - { - checkWritable(); - Class clazz; - if (object == null) - { - // string handles the output of null values - clazz = String.class; - } - else - { - clazz = object.getClass(); - } - - if (clazz == Byte.class) - { - writeByte((Byte) object); - } - else if (clazz == Boolean.class) - { - writeBoolean((Boolean) object); - } - else if (clazz == byte[].class) - { - writeBytes((byte[]) object); - } - else if (clazz == Short.class) - { - writeShort((Short) object); - } - else if (clazz == Character.class) - { - writeChar((Character) object); - } - else if (clazz == Integer.class) - { - writeInt((Integer) object); - } - else if (clazz == Long.class) - { - writeLong((Long) object); - } - else if (clazz == Float.class) - { - writeFloat((Float) object); - } - else if (clazz == Double.class) - { - writeDouble((Double) object); - } - else if (clazz == String.class) - { - writeString((String) object); - } - else - { - throw new MessageFormatException("Only primitives plus byte arrays and String are valid types"); - } - } } |