diff options
Diffstat (limited to 'java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java | 363 | ||||
-rw-r--r-- | java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java | 177 |
2 files changed, 464 insertions, 76 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java index cc820a5623..750fb48ce1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java @@ -20,16 +20,17 @@ */ package org.apache.qpid.client.message; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.AMQException; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.ContentHeaderBody; -import javax.jms.JMSException; -import javax.jms.MessageEOFException; -import javax.jms.MessageFormatException; -import javax.jms.StreamMessage; +import javax.jms.*; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.HashSet; /** * @author Apache Software Foundation @@ -70,6 +71,69 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess private static final byte STRING_TYPE = (byte) 10; /** + * Maps from type on the wire to set of valid types that can be converted to + */ + private static final Map<Byte, Set<Byte>> _typeConversionMap; + + static + { + _typeConversionMap = new HashMap<Byte, Set<Byte>>(); + Set<Byte> set = new HashSet<Byte>(); + set.add(BOOLEAN_TYPE); + set.add(STRING_TYPE); + _typeConversionMap.put(BOOLEAN_TYPE, set); + set = new HashSet<Byte>(); + set.add(BYTE_TYPE); + set.add(SHORT_TYPE); + set.add(INT_TYPE); + set.add(LONG_TYPE); + set.add(STRING_TYPE); + _typeConversionMap.put(BYTE_TYPE, set); + set = new HashSet<Byte>(); + set.add(SHORT_TYPE); + set.add(INT_TYPE); + set.add(LONG_TYPE); + set.add(STRING_TYPE); + _typeConversionMap.put(SHORT_TYPE, set); + set = new HashSet<Byte>(); + set.add(CHAR_TYPE); + set.add(STRING_TYPE); + _typeConversionMap.put(CHAR_TYPE, set); + set = new HashSet<Byte>(); + set.add(INT_TYPE); + set.add(LONG_TYPE); + set.add(STRING_TYPE); + _typeConversionMap.put(INT_TYPE, set); + set = new HashSet<Byte>(); + set.add(LONG_TYPE); + set.add(STRING_TYPE); + _typeConversionMap.put(LONG_TYPE, set); + set = new HashSet<Byte>(); + set.add(FLOAT_TYPE); + set.add(DOUBLE_TYPE); + set.add(STRING_TYPE); + _typeConversionMap.put(FLOAT_TYPE, set); + set = new HashSet<Byte>(); + set.add(DOUBLE_TYPE); + set.add(STRING_TYPE); + _typeConversionMap.put(DOUBLE_TYPE, set); + set = new HashSet<Byte>(); + set.add(BOOLEAN_TYPE); + set.add(BYTE_TYPE); + set.add(SHORT_TYPE); + set.add(CHAR_TYPE); + set.add(INT_TYPE); + set.add(LONG_TYPE); + set.add(FLOAT_TYPE); + set.add(DOUBLE_TYPE); + set.add(STRING_TYPE); + _typeConversionMap.put(STRING_TYPE, set); + set = new HashSet<Byte>(); + set.add(BYTEARRAY_TYPE); + _typeConversionMap.put(BYTEARRAY_TYPE, set); + } + + /** * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read * a byte array in multiple chunks, hence this is used to track how much is left to be read */ @@ -79,7 +143,7 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess { this(null); } - + /** * Construct a stream message with existing data. * @@ -103,25 +167,38 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess return MIME_TYPE; } - private void readAndCheckType(byte type) throws MessageFormatException + private byte readAndCheckType() throws MessageFormatException, MessageEOFException, + MessageNotReadableException { - if (_data.get() != type) - { - throw new MessageFormatException("Type " + _typeNames[type - 1] + " not found next in stream"); - } + checkReadable(); + checkAvailable(1); + return _data.get(); } - private void writeTypeDiscriminator(byte type) + private void writeTypeDiscriminator(byte type) throws MessageNotWriteableException { + checkWritable(); _data.put(type); } public boolean readBoolean() throws JMSException { - checkReadable(); - checkAvailable(2); - readAndCheckType(BOOLEAN_TYPE); - return readBooleanImpl(); + byte wireType = readAndCheckType(); + boolean result = false; + switch (wireType) + { + case BOOLEAN_TYPE: + checkAvailable(1); + result = readBooleanImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Boolean.parseBoolean(readStringImpl()); + break; + default: + throw new MessageFormatException("Unable to convert " + wireType + " to a boolean"); + } + return result; } private boolean readBooleanImpl() @@ -131,10 +208,22 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public byte readByte() throws JMSException { - checkReadable(); - checkAvailable(2); - readAndCheckType(BYTE_TYPE); - return readByteImpl(); + byte wireType = readAndCheckType(); + byte result = (byte)0; + switch (wireType) + { + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Byte.parseByte(readStringImpl()); + break; + default: + throw new MessageFormatException("Unable to convert " + wireType + " to a byte"); + } + return result; } private byte readByteImpl() @@ -144,10 +233,26 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public short readShort() throws JMSException { - checkReadable(); - checkAvailable(3); - readAndCheckType(SHORT_TYPE); - return readShortImpl(); + byte wireType = readAndCheckType(); + short result = (short) 0; + switch (wireType) + { + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Short.parseShort(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + throw new MessageFormatException("Unable to convert " + wireType + " to a short"); + } + return result; } private short readShortImpl() @@ -163,10 +268,16 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess */ public char readChar() throws JMSException { - checkReadable(); - checkAvailable(3); - readAndCheckType(CHAR_TYPE); - return readCharImpl(); + byte wireType = readAndCheckType(); + if (wireType != CHAR_TYPE) + { + throw new MessageFormatException("Unable to convert " + wireType + " to a char"); + } + else + { + checkAvailable(2); + return readCharImpl(); + } } private char readCharImpl() @@ -176,10 +287,30 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public int readInt() throws JMSException { - checkReadable(); - checkAvailable(5); - readAndCheckType(INT_TYPE); - return readIntImpl(); + byte wireType = readAndCheckType(); + int result = 0; + switch (wireType) + { + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Integer.parseInt(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + throw new MessageFormatException("Unable to convert " + wireType + " to an int"); + } + return result; } private int readIntImpl() @@ -189,10 +320,34 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public long readLong() throws JMSException { - checkReadable(); - checkAvailable(9); - readAndCheckType(LONG_TYPE); - return readLongImpl(); + byte wireType = readAndCheckType(); + long result = 0L; + switch (wireType) + { + case LONG_TYPE: + checkAvailable(8); + result = readLongImpl(); + break; + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Long.parseLong(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + throw new MessageFormatException("Unable to convert " + wireType + " to a long"); + } + return result; } private long readLongImpl() @@ -202,10 +357,22 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public float readFloat() throws JMSException { - checkReadable(); - checkAvailable(5); - readAndCheckType(FLOAT_TYPE); - return readFloatImpl(); + byte wireType = readAndCheckType(); + float result = 0f; + switch (wireType) + { + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Float.parseFloat(readStringImpl()); + break; + default: + throw new MessageFormatException("Unable to convert " + wireType + " to a float"); + } + return result; } private float readFloatImpl() @@ -215,10 +382,26 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public double readDouble() throws JMSException { - checkReadable(); - checkAvailable(9); - readAndCheckType(DOUBLE_TYPE); - return readDoubleImpl(); + byte wireType = readAndCheckType(); + double result = 0d; + switch (wireType) + { + case DOUBLE_TYPE: + checkAvailable(8); + result = readDoubleImpl(); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Double.parseDouble(readStringImpl()); + break; + default: + throw new MessageFormatException("Unable to convert " + wireType + " to a double"); + } + return result; } private double readDoubleImpl() @@ -228,12 +411,50 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public String readString() throws JMSException { - checkReadable(); - // we check only for one byte plus the type byte since theoretically the string could be only a - // single byte when using UTF-8 encoding - checkAvailable(2); - readAndCheckType(STRING_TYPE); - return readStringImpl(); + byte wireType = readAndCheckType(); + String result = null; + switch (wireType) + { + case STRING_TYPE: + checkAvailable(1); + result = readStringImpl(); + break; + case BOOLEAN_TYPE: + checkAvailable(1); + result = String.valueOf(readBooleanImpl()); + break; + case LONG_TYPE: + checkAvailable(8); + result = String.valueOf(readLongImpl()); + break; + case INT_TYPE: + checkAvailable(4); + result = String.valueOf(readIntImpl()); + break; + case SHORT_TYPE: + checkAvailable(2); + result = String.valueOf(readShortImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = String.valueOf(readByteImpl()); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = String.valueOf(readFloatImpl()); + break; + case DOUBLE_TYPE: + checkAvailable(8); + result = String.valueOf(readDoubleImpl()); + break; + case CHAR_TYPE: + checkAvailable(2); + result = String.valueOf(readCharImpl()); + break; + default: + throw new MessageFormatException("Unable to convert " + wireType + " to a String"); + } + return result; } private String readStringImpl() throws JMSException @@ -260,9 +481,15 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess // first call if (_byteArrayRemaining == -1) { - // type discriminator plus array size - checkAvailable(5); - readAndCheckType(BYTEARRAY_TYPE); + // type discriminator checked separately so you get a MessageFormatException rather than + // an EOF even in the case where both would be applicable + checkAvailable(1); + byte wireType = readAndCheckType(); + if (wireType != BYTEARRAY_TYPE) + { + throw new MessageFormatException("Unable to convert " + wireType + " to a byte array"); + } + checkAvailable(4); int size = _data.getInt(); // size of -1 indicates null if (size == -1) @@ -292,7 +519,7 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess _byteArrayRemaining -= count; if (_byteArrayRemaining == 0) { - _byteArrayRemaining = -1; + _byteArrayRemaining = -1; } if (count == 0) { @@ -307,16 +534,16 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public Object readObject() throws JMSException { - checkReadable(); - checkAvailable(1); - byte type = _data.get(); + byte wireType = readAndCheckType(); Object result = null; - switch (type) + switch (wireType) { case BOOLEAN_TYPE: + checkAvailable(1); result = readBooleanImpl(); break; case BYTE_TYPE: + checkAvailable(1); result = readByteImpl(); break; case BYTEARRAY_TYPE: @@ -334,24 +561,31 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess } break; case SHORT_TYPE: + checkAvailable(2); result = readShortImpl(); break; case CHAR_TYPE: + checkAvailable(2); result = readCharImpl(); break; case INT_TYPE: + checkAvailable(4); result = readIntImpl(); break; case LONG_TYPE: + checkAvailable(8); result = readLongImpl(); break; case FLOAT_TYPE: + checkAvailable(4); result = readFloatImpl(); break; case DOUBLE_TYPE: + checkAvailable(8); result = readDoubleImpl(); break; case STRING_TYPE: + checkAvailable(1); result = readStringImpl(); break; } @@ -360,63 +594,54 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public void writeBoolean(boolean b) throws JMSException { - checkWritable(); writeTypeDiscriminator(BOOLEAN_TYPE); _data.put(b ? (byte) 1 : (byte) 0); } public void writeByte(byte b) throws JMSException { - checkWritable(); writeTypeDiscriminator(BYTE_TYPE); _data.put(b); } public void writeShort(short i) throws JMSException { - checkWritable(); writeTypeDiscriminator(SHORT_TYPE); _data.putShort(i); } public void writeChar(char c) throws JMSException { - checkWritable(); writeTypeDiscriminator(CHAR_TYPE); _data.putChar(c); } public void writeInt(int i) throws JMSException { - checkWritable(); writeTypeDiscriminator(INT_TYPE); _data.putInt(i); } public void writeLong(long l) throws JMSException { - checkWritable(); writeTypeDiscriminator(LONG_TYPE); _data.putLong(l); } public void writeFloat(float v) throws JMSException { - checkWritable(); writeTypeDiscriminator(FLOAT_TYPE); _data.putFloat(v); } public void writeDouble(double v) throws JMSException { - checkWritable(); writeTypeDiscriminator(DOUBLE_TYPE); _data.putDouble(v); } public void writeString(String string) throws JMSException { - checkWritable(); writeTypeDiscriminator(STRING_TYPE); try { @@ -434,13 +659,11 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess public void writeBytes(byte[] bytes) throws JMSException { - checkWritable(); writeBytes(bytes, 0, bytes == null?0:bytes.length); } public void writeBytes(byte[] bytes, int offset, int length) throws JMSException { - checkWritable(); writeTypeDiscriminator(BYTEARRAY_TYPE); if (bytes == null) { diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java index af7856a78a..ef00f0b9f2 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java @@ -24,10 +24,7 @@ import junit.framework.TestCase; import org.apache.qpid.client.message.JMSStreamMessage; import org.apache.qpid.client.message.TestMessageHelper; -import javax.jms.MessageNotReadableException; -import javax.jms.MessageNotWriteableException; -import javax.jms.MessageFormatException; -import javax.jms.MessageEOFException; +import javax.jms.*; import java.util.HashMap; /** @@ -240,7 +237,7 @@ public class StreamMessageTest extends TestCase len = bm.readBytes(result); assertEquals(1, len); len = bm.readBytes(result); - assertEquals(2, len); + assertEquals(2, len); } public void testEOFByte() throws Exception @@ -418,7 +415,7 @@ public class StreamMessageTest extends TestCase fail("expected MessageEOFException, got " + e); } } - + public void testToBodyStringWithNull() throws Exception { JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); @@ -427,6 +424,174 @@ public class StreamMessageTest extends TestCase assertNull(result); } + private void checkConversionsFail(StreamMessage sm, int[] conversions) throws JMSException + { + for (int conversion : conversions) + { + try + { + switch (conversion) + { + case 0: + sm.readBoolean(); + break; + case 1: + sm.readByte(); + break; + case 2: + sm.readShort(); + break; + case 3: + sm.readChar(); + break; + case 4: + sm.readInt(); + break; + case 5: + sm.readLong(); + break; + case 6: + sm.readFloat(); + break; + case 7: + sm.readDouble(); + break; + case 8: + sm.readString(); + break; + case 9: + sm.readBytes(new byte[3]); + break; + } + fail("MessageFormatException was not thrown"); + } + catch (MessageFormatException e) + { + // PASS + } + sm.reset(); + } + } + public void testBooleanConversions() throws Exception + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeBoolean(true); + bm.reset(); + String result = bm.readString(); + assertEquals("true", result); + bm.reset(); + checkConversionsFail(bm, new int[]{1,2,3,4,5,6,7,9}); + } + + public void testByteConversions() throws Exception + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeByte((byte) 43); + bm.reset(); + assertEquals(43, bm.readShort()); + bm.reset(); + assertEquals(43, bm.readInt()); + bm.reset(); + assertEquals(43, bm.readLong()); + bm.reset(); + String result = bm.readString(); + assertEquals("43", result); + bm.reset(); + checkConversionsFail(bm, new int[]{0, 3, 6, 7, 9}); + } + + public void testShortConversions() throws Exception + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeShort((short) 87); + bm.reset(); + assertEquals(87, bm.readInt()); + bm.reset(); + assertEquals(87, bm.readLong()); + bm.reset(); + assertEquals("87", bm.readString()); + bm.reset(); + checkConversionsFail(bm, new int[]{0, 1, 3, 6, 7, }); + } + + public void testCharConversions() throws Exception + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeChar('d'); + bm.reset(); + assertEquals("d", bm.readString()); + bm.reset(); + checkConversionsFail(bm, new int[]{0, 1, 2, 4, 5, 6, 7, 9}); + } + + public void testIntConversions() throws Exception + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeInt(167); + bm.reset(); + assertEquals(167, bm.readLong()); + bm.reset(); + assertEquals("167", bm.readString()); + bm.reset(); + checkConversionsFail(bm, new int[]{0, 1, 2, 3, 6, 7, 9}); + } + + public void testLongConversions() throws Exception + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeLong(1678); + bm.reset(); + assertEquals("1678", bm.readString()); + bm.reset(); + checkConversionsFail(bm, new int[]{0, 1, 2, 3, 4, 6, 7, 9}); + } + + public void testFloatConversions() throws Exception + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeFloat(6.2f); + bm.reset(); + assertEquals(6.2d, bm.readDouble(), 0.01); + bm.reset(); + assertEquals("6.2", bm.readString()); + bm.reset(); + checkConversionsFail(bm, new int[]{0, 1, 2, 3, 4, 5, 9}); + } + + public void testDoubleConversions() throws Exception + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeDouble(88.35d); + bm.reset(); + assertEquals("88.35", bm.readString()); + bm.reset(); + checkConversionsFail(bm, new int[]{0, 1, 2, 3, 4, 5, 6, 9}); + } + + public void testStringConversions() throws Exception + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeString("true"); + bm.reset(); + assertEquals(true, bm.readBoolean()); + bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeString("2"); + bm.reset(); + assertEquals((byte)2, bm.readByte()); + bm.reset(); + assertEquals((short)2, bm.readShort()); + bm.reset(); + assertEquals((int)2, bm.readInt()); + bm.reset(); + assertEquals((long)2, bm.readLong()); + bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeString("5.7"); + bm.reset(); + assertEquals(5.7f, bm.readFloat()); + bm.reset(); + assertEquals(5.7d, bm.readDouble()); + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(StreamMessageTest.class); |