summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2006-12-12 12:54:32 +0000
committerRobert Greig <rgreig@apache.org>2006-12-12 12:54:32 +0000
commit42da34c3320485e2e88f5d4169a4f92a48c1eb7a (patch)
tree7dad7adb4c09b45c887fa29cb44f05c2127663aa /java/client/src
parentb1b8bf3498f608bf4e89c365c5e36ec895d6b18a (diff)
downloadqpid-python-42da34c3320485e2e88f5d4169a4f92a48c1eb7a.tar.gz
QPID-102: fixed reset(), null handling and readBytes() processing in StreamMessage.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@486118 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java19
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java21
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java611
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java57
4 files changed, 425 insertions, 283 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
index 77b3bd7566..bec0686ce4 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
@@ -21,23 +21,20 @@
package org.apache.qpid.client.message;
import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
import javax.jms.JMSException;
-import javax.jms.MessageNotReadableException;
import javax.jms.MessageEOFException;
-import javax.jms.MessageNotWriteableException;
import java.io.IOException;
import java.nio.charset.Charset;
-import java.nio.charset.CharacterCodingException;
/**
* @author Apache Software Foundation
*/
public abstract class AbstractBytesMessage extends AbstractJMSMessage
-{
+{
/**
* The default initial size of the buffer. The buffer expands automatically.
@@ -79,7 +76,7 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage
{
_data.clear();
}
-
+
public String toBodyString() throws JMSException
{
checkReadable();
@@ -124,7 +121,7 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage
return data;
}
}
-
+
/**
* Check that there is at least a certain number of bytes available to read
*
@@ -138,10 +135,4 @@ public abstract class AbstractBytesMessage extends AbstractJMSMessage
throw new MessageEOFException("Unable to read " + len + " bytes");
}
}
-
- public void reset() throws JMSException
- {
- super.reset();
- _data.flip();
- }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index 514287aea7..6bd4fd0297 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -384,15 +384,15 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms
}
public void acknowledge() throws JMSException
- {
+ {
// the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
// is not specified. In our case, we only set the session field where client acknowledge mode is specified.
if (_session != null)
{
if (_session.getAMQConnection().isClosed()){
throw new javax.jms.IllegalStateException("Connection is already closed");
- }
-
+ }
+
// we set multiple to true here since acknowledgement implies acknowledge of all previous messages
// received on the session
_session.acknowledgeMessage(_deliveryTag, true);
@@ -546,7 +546,14 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms
public void reset() throws JMSException
{
- _readableMessage = true;
+ if (_readableMessage)
+ {
+ _data.rewind();
+ }
+ else
+ {
+ _data.flip();
+ _readableMessage = true;
+ }
}
-
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
index ccb3c0bf57..04f3c5ee17 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
@@ -33,7 +33,7 @@ import java.nio.charset.Charset;
*/
public class JMSStreamMessage extends AbstractBytesMessage implements StreamMessage
{
- public static final String MIME_TYPE="jms/stream-message";
+ public static final String MIME_TYPE="jms/stream-message";
private static final byte BOOLEAN_TYPE = (byte) 1;
@@ -55,6 +55,8 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
private static final byte STRING_TYPE = (byte) 10;
+ private static final byte NULL_STRING_TYPE = (byte) 11;
+
/**
* This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read
* a byte array in multiple chunks, hence this is used to track how much is left to be read
@@ -89,7 +91,7 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
return MIME_TYPE;
}
- private byte readAndCheckType() throws MessageFormatException, MessageEOFException,
+ private byte readWireType() throws MessageFormatException, MessageEOFException,
MessageNotReadableException
{
checkReadable();
@@ -105,22 +107,32 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public boolean readBoolean() throws JMSException
{
- byte wireType = readAndCheckType();
+ int position = _data.position();
+ byte wireType = readWireType();
boolean result;
- switch (wireType)
- {
- case BOOLEAN_TYPE:
- checkAvailable(1);
- result = readBooleanImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Boolean.parseBoolean(readStringImpl());
- break;
- default:
- throw new MessageFormatException("Unable to convert " + wireType + " to a boolean");
+ try
+ {
+ switch (wireType)
+ {
+ case BOOLEAN_TYPE:
+ checkAvailable(1);
+ result = readBooleanImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Boolean.parseBoolean(readStringImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a boolean");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
}
- return result;
}
private boolean readBooleanImpl()
@@ -130,20 +142,30 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public byte readByte() throws JMSException
{
- byte wireType = readAndCheckType();
+ int position = _data.position();
+ byte wireType = readWireType();
byte result;
- switch (wireType)
- {
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Byte.parseByte(readStringImpl());
- break;
- default:
- throw new MessageFormatException("Unable to convert " + wireType + " to a byte");
+ try
+ {
+ switch (wireType)
+ {
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Byte.parseByte(readStringImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a byte");
+ }
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
}
return result;
}
@@ -155,24 +177,34 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public short readShort() throws JMSException
{
- byte wireType = readAndCheckType();
+ int position = _data.position();
+ byte wireType = readWireType();
short result;
- switch (wireType)
+ try
{
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Short.parseShort(readStringImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- default:
- throw new MessageFormatException("Unable to convert " + wireType + " to a short");
+ switch (wireType)
+ {
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Short.parseShort(readStringImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a short");
+ }
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
}
return result;
}
@@ -190,15 +222,25 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
*/
public char readChar() throws JMSException
{
- byte wireType = readAndCheckType();
- if (wireType != CHAR_TYPE)
+ int position = _data.position();
+ byte wireType = readWireType();
+ try
{
- throw new MessageFormatException("Unable to convert " + wireType + " to a char");
+ if (wireType != CHAR_TYPE)
+ {
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a char");
+ }
+ else
+ {
+ checkAvailable(2);
+ return readCharImpl();
+ }
}
- else
+ catch (RuntimeException e)
{
- checkAvailable(2);
- return readCharImpl();
+ _data.position(position);
+ throw e;
}
}
@@ -209,30 +251,40 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public int readInt() throws JMSException
{
- byte wireType = readAndCheckType();
+ int position = _data.position();
+ byte wireType = readWireType();
int result;
- switch (wireType)
+ try
{
- case INT_TYPE:
- checkAvailable(4);
- result = readIntImpl();
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Integer.parseInt(readStringImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- default:
- throw new MessageFormatException("Unable to convert " + wireType + " to an int");
+ switch (wireType)
+ {
+ case INT_TYPE:
+ checkAvailable(4);
+ result = readIntImpl();
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Integer.parseInt(readStringImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to an int");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
}
- return result;
}
private int readIntImpl()
@@ -242,34 +294,44 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public long readLong() throws JMSException
{
- byte wireType = readAndCheckType();
+ int position = _data.position();
+ byte wireType = readWireType();
long result;
- switch (wireType)
- {
- case LONG_TYPE:
- checkAvailable(8);
- result = readLongImpl();
- break;
- case INT_TYPE:
- checkAvailable(4);
- result = readIntImpl();
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Long.parseLong(readStringImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- default:
- throw new MessageFormatException("Unable to convert " + wireType + " to a long");
+ try
+ {
+ switch (wireType)
+ {
+ case LONG_TYPE:
+ checkAvailable(8);
+ result = readLongImpl();
+ break;
+ case INT_TYPE:
+ checkAvailable(4);
+ result = readIntImpl();
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Long.parseLong(readStringImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a long");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
}
- return result;
}
private long readLongImpl()
@@ -279,22 +341,32 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public float readFloat() throws JMSException
{
- byte wireType = readAndCheckType();
+ int position = _data.position();
+ byte wireType = readWireType();
float result;
- switch (wireType)
- {
- case FLOAT_TYPE:
- checkAvailable(4);
- result = readFloatImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Float.parseFloat(readStringImpl());
- break;
- default:
- throw new MessageFormatException("Unable to convert " + wireType + " to a float");
+ try
+ {
+ switch (wireType)
+ {
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = readFloatImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Float.parseFloat(readStringImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a float");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
}
- return result;
}
private float readFloatImpl()
@@ -304,26 +376,36 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public double readDouble() throws JMSException
{
- byte wireType = readAndCheckType();
+ int position = _data.position();
+ byte wireType = readWireType();
double result;
- switch (wireType)
- {
- case DOUBLE_TYPE:
- checkAvailable(8);
- result = readDoubleImpl();
- break;
- case FLOAT_TYPE:
- checkAvailable(4);
- result = readFloatImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Double.parseDouble(readStringImpl());
- break;
- default:
- throw new MessageFormatException("Unable to convert " + wireType + " to a double");
+ try
+ {
+ switch (wireType)
+ {
+ case DOUBLE_TYPE:
+ checkAvailable(8);
+ result = readDoubleImpl();
+ break;
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = readFloatImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Double.parseDouble(readStringImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a double");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
}
- return result;
}
private double readDoubleImpl()
@@ -333,50 +415,63 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public String readString() throws JMSException
{
- byte wireType = readAndCheckType();
+ int position = _data.position();
+ byte wireType = readWireType();
String result;
- switch (wireType)
- {
- case STRING_TYPE:
- checkAvailable(1);
- result = readStringImpl();
- break;
- case BOOLEAN_TYPE:
- checkAvailable(1);
- result = String.valueOf(readBooleanImpl());
- break;
- case LONG_TYPE:
- checkAvailable(8);
- result = String.valueOf(readLongImpl());
- break;
- case INT_TYPE:
- checkAvailable(4);
- result = String.valueOf(readIntImpl());
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = String.valueOf(readShortImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = String.valueOf(readByteImpl());
- break;
- case FLOAT_TYPE:
- checkAvailable(4);
- result = String.valueOf(readFloatImpl());
- break;
- case DOUBLE_TYPE:
- checkAvailable(8);
- result = String.valueOf(readDoubleImpl());
- break;
- case CHAR_TYPE:
- checkAvailable(2);
- result = String.valueOf(readCharImpl());
- break;
- default:
- throw new MessageFormatException("Unable to convert " + wireType + " to a String");
+ try
+ {
+ switch (wireType)
+ {
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = readStringImpl();
+ break;
+ case NULL_STRING_TYPE:
+ result = null;
+ break;
+ case BOOLEAN_TYPE:
+ checkAvailable(1);
+ result = String.valueOf(readBooleanImpl());
+ break;
+ case LONG_TYPE:
+ checkAvailable(8);
+ result = String.valueOf(readLongImpl());
+ break;
+ case INT_TYPE:
+ checkAvailable(4);
+ result = String.valueOf(readIntImpl());
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = String.valueOf(readShortImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = String.valueOf(readByteImpl());
+ break;
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = String.valueOf(readFloatImpl());
+ break;
+ case DOUBLE_TYPE:
+ checkAvailable(8);
+ result = String.valueOf(readDoubleImpl());
+ break;
+ case CHAR_TYPE:
+ checkAvailable(2);
+ result = String.valueOf(readCharImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a String");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
}
- return result;
}
private String readStringImpl() throws JMSException
@@ -406,7 +501,7 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
// type discriminator checked separately so you get a MessageFormatException rather than
// an EOF even in the case where both would be applicable
checkAvailable(1);
- byte wireType = readAndCheckType();
+ byte wireType = readWireType();
if (wireType != BYTEARRAY_TYPE)
{
throw new MessageFormatException("Unable to convert " + wireType + " to a byte array");
@@ -431,18 +526,25 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
}
}
}
+ else if (_byteArrayRemaining == 0)
+ {
+ _byteArrayRemaining = -1;
+ return -1;
+ }
- return readBytesImpl(bytes);
+ int returnedSize = readBytesImpl(bytes);
+ if (returnedSize < bytes.length)
+ {
+ _byteArrayRemaining = -1;
+ }
+ return returnedSize;
}
private int readBytesImpl(byte[] bytes)
{
int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining);
_byteArrayRemaining -= count;
- if (_byteArrayRemaining == 0)
- {
- _byteArrayRemaining = -1;
- }
+
if (count == 0)
{
return 0;
@@ -456,62 +558,74 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public Object readObject() throws JMSException
{
- byte wireType = readAndCheckType();
+ int position = _data.position();
+ byte wireType = readWireType();
Object result = null;
- switch (wireType)
- {
- case BOOLEAN_TYPE:
- checkAvailable(1);
- result = readBooleanImpl();
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- case BYTEARRAY_TYPE:
- checkAvailable(4);
- int size = _data.getInt();
- if (size == -1)
- {
+ try
+ {
+ switch (wireType)
+ {
+ case BOOLEAN_TYPE:
+ checkAvailable(1);
+ result = readBooleanImpl();
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ case BYTEARRAY_TYPE:
+ checkAvailable(4);
+ int size = _data.getInt();
+ if (size == -1)
+ {
+ result = null;
+ }
+ else
+ {
+ _byteArrayRemaining = size;
+ result = new byte[size];
+ readBytesImpl(new byte[size]);
+ }
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case CHAR_TYPE:
+ checkAvailable(2);
+ result = readCharImpl();
+ break;
+ case INT_TYPE:
+ checkAvailable(4);
+ result = readIntImpl();
+ break;
+ case LONG_TYPE:
+ checkAvailable(8);
+ result = readLongImpl();
+ break;
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = readFloatImpl();
+ break;
+ case DOUBLE_TYPE:
+ checkAvailable(8);
+ result = readDoubleImpl();
+ break;
+ case NULL_STRING_TYPE:
result = null;
- }
- else
- {
- _byteArrayRemaining = size;
- result = new byte[size];
- readBytesImpl(new byte[size]);
- }
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case CHAR_TYPE:
- checkAvailable(2);
- result = readCharImpl();
- break;
- case INT_TYPE:
- checkAvailable(4);
- result = readIntImpl();
- break;
- case LONG_TYPE:
- checkAvailable(8);
- result = readLongImpl();
- break;
- case FLOAT_TYPE:
- checkAvailable(4);
- result = readFloatImpl();
- break;
- case DOUBLE_TYPE:
- checkAvailable(8);
- result = readDoubleImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = readStringImpl();
- break;
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = readStringImpl();
+ break;
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
}
- return result;
}
public void writeBoolean(boolean b) throws JMSException
@@ -564,18 +678,25 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public void writeString(String string) throws JMSException
{
- writeTypeDiscriminator(STRING_TYPE);
- try
+ if (string == null)
{
- _data.putString(string, Charset.forName("UTF-8").newEncoder());
- // we must write the null terminator ourselves
- _data.put((byte)0);
+ writeTypeDiscriminator(NULL_STRING_TYPE);
}
- catch (CharacterCodingException e)
+ else
{
- JMSException ex = new JMSException("Unable to encode string: " + e);
- ex.setLinkedException(e);
- throw ex;
+ writeTypeDiscriminator(STRING_TYPE);
+ try
+ {
+ _data.putString(string, Charset.forName("UTF-8").newEncoder());
+ // we must write the null terminator ourselves
+ _data.put((byte)0);
+ }
+ catch (CharacterCodingException e)
+ {
+ JMSException ex = new JMSException("Unable to encode string: " + e);
+ ex.setLinkedException(e);
+ throw ex;
+ }
}
}
@@ -601,11 +722,17 @@ public class JMSStreamMessage extends AbstractBytesMessage implements StreamMess
public void writeObject(Object object) throws JMSException
{
checkWritable();
+ Class clazz = null;
if (object == null)
{
- throw new NullPointerException("Argument must not be null");
+ // string handles the output of null values
+ clazz = String.class;
+ }
+ else
+ {
+ clazz = object.getClass();
}
- Class clazz = object.getClass();
+
if (clazz == Byte.class)
{
writeByte((Byte) object);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java
index ef00f0b9f2..727881de96 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java
@@ -168,24 +168,6 @@ public class StreamMessageTest extends TestCase
}
}
- public void testWriteObjectThrowsNPE() throws Exception
- {
- try
- {
- JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
- bm.writeObject(null);
- fail("expected exception did not occur");
- }
- catch (NullPointerException n)
- {
- // ok
- }
- catch (Exception e)
- {
- fail("expected NullPointerException, got " + e);
- }
- }
-
public void testReadBoolean() throws Exception
{
JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
@@ -221,9 +203,34 @@ public class StreamMessageTest extends TestCase
len = bm.readBytes(bytes);
assertEquals(-1, len);
len = bm.readBytes(bytes);
+ assertEquals(-1, len);
+ len = bm.readBytes(bytes);
assertEquals(0, len);
}
+ public void testReadBytesFollowedByPrimitive() throws Exception
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeBytes(new byte[]{2, 3, 4, 5, 6, 7, 8});
+ bm.writeBytes(new byte[]{2, 3, 4, 5, 6, 7});
+ bm.writeString("Foo");
+ bm.reset();
+ int len;
+ do
+ {
+ len = bm.readBytes(new byte[2]);
+ }
+ while (len == 2);
+
+ do
+ {
+ len = bm.readBytes(new byte[2]);
+ }
+ while (len == 2);
+
+ assertEquals("Foo", bm.readString());
+ }
+
public void testReadMultipleByteArrays() throws Exception
{
JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
@@ -577,11 +584,11 @@ public class StreamMessageTest extends TestCase
bm = TestMessageHelper.newJMSStreamMessage();
bm.writeString("2");
bm.reset();
- assertEquals((byte)2, bm.readByte());
+ assertEquals((byte)2, bm.readByte());
bm.reset();
assertEquals((short)2, bm.readShort());
bm.reset();
- assertEquals((int)2, bm.readInt());
+ assertEquals(2, bm.readInt());
bm.reset();
assertEquals((long)2, bm.readLong());
bm = TestMessageHelper.newJMSStreamMessage();
@@ -592,6 +599,16 @@ public class StreamMessageTest extends TestCase
assertEquals(5.7d, bm.readDouble());
}
+ public void testNulls() throws Exception
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeString(null);
+ bm.writeObject(null);
+ bm.reset();
+ assertNull(bm.readObject());
+ assertNull(bm.readObject());
+ }
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(StreamMessageTest.class);