summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2006-12-09 14:50:26 +0000
committerRobert Greig <rgreig@apache.org>2006-12-09 14:50:26 +0000
commitf616a17577e96442ec43de0afe87cd3e0704ee3b (patch)
tree3dd42752e3e08ad3ee2d0cf9f765407bb4c5b6e5 /java
parent92a90067b4ea4240a0303100f46130f0ad612898 (diff)
downloadqpid-python-f616a17577e96442ec43de0afe87cd3e0704ee3b.tar.gz
QPID-102 Addition of StreamMessage support
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@484987 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java18
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java147
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java154
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java509
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java41
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java1
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java5
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/message/BytesMessageTest.java156
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java434
9 files changed, 1358 insertions, 107 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 3f0aee23a0..8f90913e5c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -27,6 +27,7 @@ import org.apache.qpid.client.failover.FailoverSupport;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.framing.*;
@@ -367,13 +368,24 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public StreamMessage createStreamMessage() throws JMSException
{
- checkNotClosed();
- throw new UnsupportedOperationException("Stream messages not supported");
+ synchronized (_connection.getFailoverMutex())
+ {
+ checkNotClosed();
+
+ try
+ {
+ return (StreamMessage) _messageFactoryRegistry.createMessage(JMSStreamMessage.MIME_TYPE);
+ }
+ catch (AMQException e)
+ {
+ throw new JMSException("Unable to create text message: " + e);
+ }
+ }
}
public TextMessage createTextMessage() throws JMSException
{
- synchronized(_connection.getFailoverMutex())
+ synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
new file mode 100644
index 0000000000..77b3bd7566
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
@@ -0,0 +1,147 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.AMQException;
+
+import javax.jms.JMSException;
+import javax.jms.MessageNotReadableException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageNotWriteableException;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharacterCodingException;
+
+/**
+ * @author Apache Software Foundation
+ */
+public abstract class AbstractBytesMessage extends AbstractJMSMessage
+{
+
+ /**
+ * The default initial size of the buffer. The buffer expands automatically.
+ */
+ private static final int DEFAULT_BUFFER_INITIAL_SIZE = 1024;
+
+ AbstractBytesMessage()
+ {
+ this(null);
+ }
+
+ /**
+ * Construct a bytes message with existing data.
+ *
+ * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
+ * set to auto expand
+ */
+ AbstractBytesMessage(ByteBuffer data)
+ {
+ super(data); // this instanties a content header
+ getJmsContentHeaderProperties().setContentType(getMimeType());
+
+ if (_data == null)
+ {
+ _data = ByteBuffer.allocate(DEFAULT_BUFFER_INITIAL_SIZE);
+ _data.setAutoExpand(true);
+ }
+ }
+
+ AbstractBytesMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data)
+ throws AMQException
+ {
+ // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea
+ super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, data);
+ getJmsContentHeaderProperties().setContentType(getMimeType());
+ }
+
+ public void clearBodyImpl() throws JMSException
+ {
+ _data.clear();
+ }
+
+ public String toBodyString() throws JMSException
+ {
+ checkReadable();
+ try
+ {
+ return getText();
+ }
+ catch (IOException e)
+ {
+ throw new JMSException(e.toString());
+ }
+ }
+
+ /**
+ * We reset the stream before and after reading the data. This means that toString() will always output
+ * the entire message and also that the caller can then immediately start reading as if toString() had
+ * never been called.
+ *
+ * @return
+ * @throws IOException
+ */
+ private String getText() throws IOException
+ {
+ // this will use the default platform encoding
+ if (_data == null)
+ {
+ return null;
+ }
+ int pos = _data.position();
+ _data.rewind();
+ // one byte left is for the end of frame marker
+ if (_data.remaining() == 0)
+ {
+ // this is really redundant since pos must be zero
+ _data.position(pos);
+ return null;
+ }
+ else
+ {
+ String data = _data.getString(Charset.forName("UTF8").newDecoder());
+ _data.position(pos);
+ return data;
+ }
+ }
+
+ /**
+ * Check that there is at least a certain number of bytes available to read
+ *
+ * @param len the number of bytes
+ * @throws javax.jms.MessageEOFException if there are less than len bytes available to read
+ */
+ protected void checkAvailable(int len) throws MessageEOFException
+ {
+ if (_data.remaining() < len)
+ {
+ throw new MessageEOFException("Unable to read " + len + " bytes");
+ }
+ }
+
+ public void reset() throws JMSException
+ {
+ super.reset();
+ _data.flip();
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
index 6e1958e40a..456d4d520c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,29 +20,21 @@
*/
package org.apache.qpid.client.message;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.AMQException;
import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ContentHeaderBody;
+import javax.jms.BytesMessage;
import javax.jms.JMSException;
-import javax.jms.MessageNotReadableException;
-import javax.jms.MessageNotWriteableException;
+import javax.jms.MessageFormatException;
import javax.jms.MessageEOFException;
-import java.io.*;
-import java.nio.charset.Charset;
import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
-public class JMSBytesMessage extends AbstractJMSMessage implements javax.jms.BytesMessage
+public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessage
{
private static final String MIME_TYPE = "application/octet-stream";
-
- /**
- * The default initial size of the buffer. The buffer expands automatically.
- */
- private static final int DEFAULT_BUFFER_INITIAL_SIZE = 1024;
-
JMSBytesMessage()
{
this(null);
@@ -57,71 +49,12 @@ public class JMSBytesMessage extends AbstractJMSMessage implements javax.jms.Byt
JMSBytesMessage(ByteBuffer data)
{
super(data); // this instanties a content header
- getJmsContentHeaderProperties().setContentType(MIME_TYPE);
-
- if (_data == null)
- {
- _data = ByteBuffer.allocate(DEFAULT_BUFFER_INITIAL_SIZE);
- _data.setAutoExpand(true);
- }
}
JMSBytesMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data)
throws AMQException
{
- // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea
- super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, data);
- getJmsContentHeaderProperties().setContentType(MIME_TYPE);
- }
-
- public void clearBodyImpl() throws JMSException
- {
- _data.clear();
- }
-
- public String toBodyString() throws JMSException
- {
- checkReadable();
- try
- {
- return getText();
- }
- catch (IOException e)
- {
- throw new JMSException(e.toString());
- }
- }
-
- /**
- * We reset the stream before and after reading the data. This means that toString() will always output
- * the entire message and also that the caller can then immediately start reading as if toString() had
- * never been called.
- *
- * @return
- * @throws IOException
- */
- private String getText() throws IOException
- {
- // this will use the default platform encoding
- if (_data == null)
- {
- return null;
- }
- int pos = _data.position();
- _data.rewind();
- // one byte left is for the end of frame marker
- if (_data.remaining() == 0)
- {
- // this is really redundant since pos must be zero
- _data.position(pos);
- return null;
- }
- else
- {
- String data = _data.getString(Charset.forName("UTF8").newDecoder());
- _data.position(pos);
- return data;
- }
+ super(messageNbr, contentHeader, data);
}
public String getMimeType()
@@ -135,21 +68,6 @@ public class JMSBytesMessage extends AbstractJMSMessage implements javax.jms.Byt
return _data.limit();
}
-
- /**
- * Check that there is at least a certain number of bytes available to read
- *
- * @param len the number of bytes
- * @throws MessageEOFException if there are less than len bytes available to read
- */
- private void checkAvailable(int len) throws MessageEOFException
- {
- if (_data.remaining() < len)
- {
- throw new MessageEOFException("Unable to read " + len + " bytes");
- }
- }
-
public boolean readBoolean() throws JMSException
{
checkReadable();
@@ -340,6 +258,8 @@ public class JMSBytesMessage extends AbstractJMSMessage implements javax.jms.Byt
try
{
_data.putString(string, Charset.forName("UTF-8").newEncoder());
+ // we must add the null terminator manually
+ _data.put((byte)0);
}
catch (CharacterCodingException e)
{
@@ -368,13 +288,51 @@ public class JMSBytesMessage extends AbstractJMSMessage implements javax.jms.Byt
{
throw new NullPointerException("Argument must not be null");
}
- _data.putObject(object);
- }
-
- public void reset() throws JMSException
- {
- super.reset();
- _data.flip();
+ Class clazz = object.getClass();
+ if (clazz == Byte.class)
+ {
+ writeByte((Byte) object);
+ }
+ else if (clazz == Boolean.class)
+ {
+ writeBoolean((Boolean) object);
+ }
+ else if (clazz == byte[].class)
+ {
+ writeBytes((byte[]) object);
+ }
+ else if (clazz == Short.class)
+ {
+ writeShort((Short) object);
+ }
+ else if (clazz == Character.class)
+ {
+ writeChar((Character) object);
+ }
+ else if (clazz == Integer.class)
+ {
+ writeInt((Integer) object);
+ }
+ else if (clazz == Long.class)
+ {
+ writeLong((Long) object);
+ }
+ else if (clazz == Float.class)
+ {
+ writeFloat((Float) object);
+ }
+ else if (clazz == Double.class)
+ {
+ writeDouble((Double) object);
+ }
+ else if (clazz == String.class)
+ {
+ writeUTF((String) object);
+ }
+ else
+ {
+ throw new MessageFormatException("Only primitives plus byte arrays and String are valid types");
+ }
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
new file mode 100644
index 0000000000..cc820a5623
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
@@ -0,0 +1,509 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.AMQException;
+import org.apache.mina.common.ByteBuffer;
+
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+import javax.jms.StreamMessage;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class JMSStreamMessage extends AbstractBytesMessage implements StreamMessage
+{
+ public static final String MIME_TYPE="jms/stream-message";
+
+ private static final String[] _typeNames = { "boolean",
+ "byte",
+ "byte array",
+ "short",
+ "char",
+ "int",
+ "long",
+ "float",
+ "double",
+ "utf string" };
+
+ private static final byte BOOLEAN_TYPE = (byte) 1;
+
+ private static final byte BYTE_TYPE = (byte) 2;
+
+ private static final byte BYTEARRAY_TYPE = (byte) 3;
+
+ private static final byte SHORT_TYPE = (byte) 4;
+
+ private static final byte CHAR_TYPE = (byte) 5;
+
+ private static final byte INT_TYPE = (byte) 6;
+
+ private static final byte LONG_TYPE = (byte) 7;
+
+ private static final byte FLOAT_TYPE = (byte) 8;
+
+ private static final byte DOUBLE_TYPE = (byte) 9;
+
+ private static final byte STRING_TYPE = (byte) 10;
+
+ /**
+ * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read
+ * a byte array in multiple chunks, hence this is used to track how much is left to be read
+ */
+ private int _byteArrayRemaining = -1;
+
+ JMSStreamMessage()
+ {
+ this(null);
+ }
+
+ /**
+ * Construct a stream message with existing data.
+ *
+ * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
+ * set to auto expand
+ */
+ JMSStreamMessage(ByteBuffer data)
+ {
+ super(data); // this instanties a content header
+ }
+
+
+ JMSStreamMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data)
+ throws AMQException
+ {
+ super(messageNbr, contentHeader, data);
+ }
+
+ public String getMimeType()
+ {
+ return MIME_TYPE;
+ }
+
+ private void readAndCheckType(byte type) throws MessageFormatException
+ {
+ if (_data.get() != type)
+ {
+ throw new MessageFormatException("Type " + _typeNames[type - 1] + " not found next in stream");
+ }
+ }
+
+ private void writeTypeDiscriminator(byte type)
+ {
+ _data.put(type);
+ }
+
+ public boolean readBoolean() throws JMSException
+ {
+ checkReadable();
+ checkAvailable(2);
+ readAndCheckType(BOOLEAN_TYPE);
+ return readBooleanImpl();
+ }
+
+ private boolean readBooleanImpl()
+ {
+ return _data.get() != 0;
+ }
+
+ public byte readByte() throws JMSException
+ {
+ checkReadable();
+ checkAvailable(2);
+ readAndCheckType(BYTE_TYPE);
+ return readByteImpl();
+ }
+
+ private byte readByteImpl()
+ {
+ return _data.get();
+ }
+
+ public short readShort() throws JMSException
+ {
+ checkReadable();
+ checkAvailable(3);
+ readAndCheckType(SHORT_TYPE);
+ return readShortImpl();
+ }
+
+ private short readShortImpl()
+ {
+ return _data.getShort();
+ }
+
+ /**
+ * Note that this method reads a unicode character as two bytes from the stream
+ *
+ * @return the character read from the stream
+ * @throws JMSException
+ */
+ public char readChar() throws JMSException
+ {
+ checkReadable();
+ checkAvailable(3);
+ readAndCheckType(CHAR_TYPE);
+ return readCharImpl();
+ }
+
+ private char readCharImpl()
+ {
+ return _data.getChar();
+ }
+
+ public int readInt() throws JMSException
+ {
+ checkReadable();
+ checkAvailable(5);
+ readAndCheckType(INT_TYPE);
+ return readIntImpl();
+ }
+
+ private int readIntImpl()
+ {
+ return _data.getInt();
+ }
+
+ public long readLong() throws JMSException
+ {
+ checkReadable();
+ checkAvailable(9);
+ readAndCheckType(LONG_TYPE);
+ return readLongImpl();
+ }
+
+ private long readLongImpl()
+ {
+ return _data.getLong();
+ }
+
+ public float readFloat() throws JMSException
+ {
+ checkReadable();
+ checkAvailable(5);
+ readAndCheckType(FLOAT_TYPE);
+ return readFloatImpl();
+ }
+
+ private float readFloatImpl()
+ {
+ return _data.getFloat();
+ }
+
+ public double readDouble() throws JMSException
+ {
+ checkReadable();
+ checkAvailable(9);
+ readAndCheckType(DOUBLE_TYPE);
+ return readDoubleImpl();
+ }
+
+ private double readDoubleImpl()
+ {
+ return _data.getDouble();
+ }
+
+ public String readString() throws JMSException
+ {
+ checkReadable();
+ // we check only for one byte plus the type byte since theoretically the string could be only a
+ // single byte when using UTF-8 encoding
+ checkAvailable(2);
+ readAndCheckType(STRING_TYPE);
+ return readStringImpl();
+ }
+
+ private String readStringImpl() throws JMSException
+ {
+ try
+ {
+ return _data.getString(Charset.forName("UTF-8").newDecoder());
+ }
+ catch (CharacterCodingException e)
+ {
+ JMSException je = new JMSException("Error decoding byte stream as a UTF8 string: " + e);
+ je.setLinkedException(e);
+ throw je;
+ }
+ }
+
+ public int readBytes(byte[] bytes) throws JMSException
+ {
+ if (bytes == null)
+ {
+ throw new IllegalArgumentException("byte array must not be null");
+ }
+ checkReadable();
+ // first call
+ if (_byteArrayRemaining == -1)
+ {
+ // type discriminator plus array size
+ checkAvailable(5);
+ readAndCheckType(BYTEARRAY_TYPE);
+ int size = _data.getInt();
+ // size of -1 indicates null
+ if (size == -1)
+ {
+ return -1;
+ }
+ else
+ {
+ if (size > _data.remaining())
+ {
+ throw new MessageEOFException("Byte array has stated size " + size + " but message only contains " +
+ _data.remaining() + " bytes");
+ }
+ else
+ {
+ _byteArrayRemaining = size;
+ }
+ }
+ }
+
+ return readBytesImpl(bytes);
+ }
+
+ private int readBytesImpl(byte[] bytes)
+ {
+ int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining);
+ _byteArrayRemaining -= count;
+ if (_byteArrayRemaining == 0)
+ {
+ _byteArrayRemaining = -1;
+ }
+ if (count == 0)
+ {
+ return 0;
+ }
+ else
+ {
+ _data.get(bytes, 0, count);
+ return count;
+ }
+ }
+
+ public Object readObject() throws JMSException
+ {
+ checkReadable();
+ checkAvailable(1);
+ byte type = _data.get();
+ Object result = null;
+ switch (type)
+ {
+ case BOOLEAN_TYPE:
+ result = readBooleanImpl();
+ break;
+ case BYTE_TYPE:
+ result = readByteImpl();
+ break;
+ case BYTEARRAY_TYPE:
+ checkAvailable(4);
+ int size = _data.getInt();
+ if (size == -1)
+ {
+ result = null;
+ }
+ else
+ {
+ _byteArrayRemaining = size;
+ result = new byte[size];
+ readBytesImpl(new byte[size]);
+ }
+ break;
+ case SHORT_TYPE:
+ result = readShortImpl();
+ break;
+ case CHAR_TYPE:
+ result = readCharImpl();
+ break;
+ case INT_TYPE:
+ result = readIntImpl();
+ break;
+ case LONG_TYPE:
+ result = readLongImpl();
+ break;
+ case FLOAT_TYPE:
+ result = readFloatImpl();
+ break;
+ case DOUBLE_TYPE:
+ result = readDoubleImpl();
+ break;
+ case STRING_TYPE:
+ result = readStringImpl();
+ break;
+ }
+ return result;
+ }
+
+ public void writeBoolean(boolean b) throws JMSException
+ {
+ checkWritable();
+ writeTypeDiscriminator(BOOLEAN_TYPE);
+ _data.put(b ? (byte) 1 : (byte) 0);
+ }
+
+ public void writeByte(byte b) throws JMSException
+ {
+ checkWritable();
+ writeTypeDiscriminator(BYTE_TYPE);
+ _data.put(b);
+ }
+
+ public void writeShort(short i) throws JMSException
+ {
+ checkWritable();
+ writeTypeDiscriminator(SHORT_TYPE);
+ _data.putShort(i);
+ }
+
+ public void writeChar(char c) throws JMSException
+ {
+ checkWritable();
+ writeTypeDiscriminator(CHAR_TYPE);
+ _data.putChar(c);
+ }
+
+ public void writeInt(int i) throws JMSException
+ {
+ checkWritable();
+ writeTypeDiscriminator(INT_TYPE);
+ _data.putInt(i);
+ }
+
+ public void writeLong(long l) throws JMSException
+ {
+ checkWritable();
+ writeTypeDiscriminator(LONG_TYPE);
+ _data.putLong(l);
+ }
+
+ public void writeFloat(float v) throws JMSException
+ {
+ checkWritable();
+ writeTypeDiscriminator(FLOAT_TYPE);
+ _data.putFloat(v);
+ }
+
+ public void writeDouble(double v) throws JMSException
+ {
+ checkWritable();
+ writeTypeDiscriminator(DOUBLE_TYPE);
+ _data.putDouble(v);
+ }
+
+ public void writeString(String string) throws JMSException
+ {
+ checkWritable();
+ writeTypeDiscriminator(STRING_TYPE);
+ try
+ {
+ _data.putString(string, Charset.forName("UTF-8").newEncoder());
+ // we must write the null terminator ourselves
+ _data.put((byte)0);
+ }
+ catch (CharacterCodingException e)
+ {
+ JMSException ex = new JMSException("Unable to encode string: " + e);
+ ex.setLinkedException(e);
+ throw ex;
+ }
+ }
+
+ public void writeBytes(byte[] bytes) throws JMSException
+ {
+ checkWritable();
+ writeBytes(bytes, 0, bytes == null?0:bytes.length);
+ }
+
+ public void writeBytes(byte[] bytes, int offset, int length) throws JMSException
+ {
+ checkWritable();
+ writeTypeDiscriminator(BYTEARRAY_TYPE);
+ if (bytes == null)
+ {
+ _data.putInt(-1);
+ }
+ else
+ {
+ _data.putInt(length);
+ _data.put(bytes, offset, length);
+ }
+ }
+
+ public void writeObject(Object object) throws JMSException
+ {
+ checkWritable();
+ if (object == null)
+ {
+ throw new NullPointerException("Argument must not be null");
+ }
+ Class clazz = object.getClass();
+ if (clazz == Byte.class)
+ {
+ writeByte((Byte) object);
+ }
+ else if (clazz == Boolean.class)
+ {
+ writeBoolean((Boolean) object);
+ }
+ else if (clazz == byte[].class)
+ {
+ writeBytes((byte[]) object);
+ }
+ else if (clazz == Short.class)
+ {
+ writeShort((Short) object);
+ }
+ else if (clazz == Character.class)
+ {
+ writeChar((Character) object);
+ }
+ else if (clazz == Integer.class)
+ {
+ writeInt((Integer) object);
+ }
+ else if (clazz == Long.class)
+ {
+ writeLong((Long) object);
+ }
+ else if (clazz == Float.class)
+ {
+ writeFloat((Float) object);
+ }
+ else if (clazz == Double.class)
+ {
+ writeDouble((Double) object);
+ }
+ else if (clazz == String.class)
+ {
+ writeString((String) object);
+ }
+ else
+ {
+ throw new MessageFormatException("Only primitives plus byte arrays and String are valid types");
+ }
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
new file mode 100644
index 0000000000..aae9f0cdb2
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.AMQException;
+
+import javax.jms.JMSException;
+
+public class JMSStreamMessageFactory extends AbstractJMSMessageFactory
+{
+ protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) throws
+ AMQException
+ {
+ return new JMSStreamMessage(deliveryTag, contentHeader, data);
+ }
+
+ public AbstractJMSMessage createMessage() throws JMSException
+ {
+ return new JMSStreamMessage();
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
index 31c9c2ed91..348988f06d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
@@ -103,6 +103,7 @@ public class MessageFactoryRegistry
mf.registerFactory("text/xml", new JMSTextMessageFactory());
mf.registerFactory("application/octet-stream", new JMSBytesMessageFactory());
mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory());
+ mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory());
mf.registerFactory(null, new JMSBytesMessageFactory());
return mf;
}
diff --git a/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java b/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java
index c34dbf14f1..9b477c19e2 100644
--- a/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java
+++ b/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java
@@ -38,4 +38,9 @@ public class TestMessageHelper
{
return new JMSMapMessage();
}
+
+ public static JMSStreamMessage newJMSStreamMessage()
+ {
+ return new JMSStreamMessage();
+ }
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/BytesMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/BytesMessageTest.java
index 2a76c920b1..7ffb3ca469 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/BytesMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/BytesMessageTest.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,14 +20,15 @@
*/
package org.apache.qpid.test.unit.client.message;
+import junit.framework.TestCase;
import org.apache.qpid.client.message.JMSBytesMessage;
import org.apache.qpid.client.message.TestMessageHelper;
+import javax.jms.MessageEOFException;
import javax.jms.MessageNotReadableException;
import javax.jms.MessageNotWriteableException;
-import javax.jms.MessageEOFException;
-
-import junit.framework.TestCase;
+import javax.jms.MessageFormatException;
+import java.util.HashMap;
public class BytesMessageTest extends TestCase
{
@@ -82,6 +83,18 @@ public class BytesMessageTest extends TestCase
bm.writeInt(10);
}
+ public void testWriteBoolean() throws Exception
+ {
+ JMSBytesMessage bm = TestMessageHelper.newJMSBytesMessage();
+ bm.writeBoolean(true);
+ bm.writeBoolean(false);
+ bm.reset();
+ boolean val = bm.readBoolean();
+ assertEquals(true, val);
+ val = bm.readBoolean();
+ assertEquals(false, val);
+ }
+
public void testWriteInt() throws Exception
{
JMSBytesMessage bm = TestMessageHelper.newJMSBytesMessage();
@@ -102,6 +115,61 @@ public class BytesMessageTest extends TestCase
assertEquals("Bananas", res);
}
+ public void testWriteBytes() throws Exception
+ {
+ JMSBytesMessage bm = TestMessageHelper.newJMSBytesMessage();
+ byte[] bytes = {1,2,3,4};
+ bm.writeBytes(bytes, 1, 2);
+ bm.reset();
+ bytes = new byte[2];
+ bm.readBytes(bytes);
+ assertEquals(2, bytes[0]);
+ assertEquals(3, bytes[1]);
+ }
+
+ public void testWriteObject() throws Exception
+ {
+ JMSBytesMessage bm = TestMessageHelper.newJMSBytesMessage();
+ bm.writeObject(new Boolean(true));
+ bm.writeObject(new Boolean(false));
+ bm.writeObject(new Byte((byte)2));
+ bm.writeObject(new byte[]{1,2,3,4});
+ bm.writeObject(new Character('g'));
+ bm.writeObject(new Short((short) 29));
+ bm.writeObject(new Integer(101));
+ bm.writeObject(new Long(50003222L));
+ bm.writeObject("Foobar");
+ bm.writeObject(new Float(1.7f));
+ bm.writeObject(new Double(8.7d));
+ bm.reset();
+ assertTrue(bm.readBoolean());
+ assertTrue(!bm.readBoolean());
+ assertEquals((byte)2, bm.readByte());
+ byte[] bytes = new byte[4];
+ bm.readBytes(bytes);
+ assertEquals('g', bm.readChar());
+ assertEquals((short) 29, bm.readShort());
+ assertEquals(101, bm.readInt());
+ assertEquals(50003222L, bm.readLong());
+ assertEquals("Foobar", bm.readUTF());
+ assertEquals(1.7f, bm.readFloat());
+ assertEquals(8.7d, bm.readDouble());
+ }
+
+ public void testWriteObjectRejectsNonPrimitives() throws Exception
+ {
+ try
+ {
+ JMSBytesMessage bm = TestMessageHelper.newJMSBytesMessage();
+ bm.writeObject(new HashMap());
+ fail("expected MessageFormatException was not thrown");
+ }
+ catch (MessageFormatException e)
+ {
+ // pass
+ }
+ }
+
public void testWriteObjectThrowsNPE() throws Exception
{
try
@@ -126,7 +194,83 @@ public class BytesMessageTest extends TestCase
bm.writeBoolean(true);
bm.reset();
boolean result = bm.readBoolean();
- assertTrue(result);
+ assertTrue(result);
+ }
+
+ public void testReadUnsignedByte() throws Exception
+ {
+ JMSBytesMessage bm = TestMessageHelper.newJMSBytesMessage();
+ bm.writeByte((byte) 9);
+ bm.reset();
+ int result = bm.readUnsignedByte();
+ assertEquals(9, result);
+ }
+
+ public void testReadUnsignedShort() throws Exception
+ {
+ JMSBytesMessage bm = TestMessageHelper.newJMSBytesMessage();
+ bm.writeShort((byte) 9);
+ bm.reset();
+ int result = bm.readUnsignedShort();
+ assertEquals(9, result);
+ }
+
+ public void testReadBytesChecksNull() throws Exception
+ {
+ try
+ {
+ JMSBytesMessage bm = TestMessageHelper.newJMSBytesMessage();
+ bm.readBytes(null);
+ }
+ catch (IllegalArgumentException e)
+ {
+ // pass
+ }
+
+ try
+ {
+ JMSBytesMessage bm = TestMessageHelper.newJMSBytesMessage();
+ bm.readBytes(null, 1);
+ }
+ catch (IllegalArgumentException e)
+ {
+ // pass
+ }
+ }
+
+ public void testReadBytesChecksMaxSize() throws Exception
+ {
+ try
+ {
+ JMSBytesMessage bm = TestMessageHelper.newJMSBytesMessage();
+ byte[] bytes = new byte[100];
+ bm.readBytes(bytes, 120);
+ }
+ catch (IllegalArgumentException e)
+ {
+ // pass
+ }
+ }
+
+ public void testReadBytesReturnsCorrectLengths() throws Exception
+ {
+ JMSBytesMessage bm = TestMessageHelper.newJMSBytesMessage();
+ byte[] bytes = {2, 3};
+ bm.writeBytes(bytes);
+ bm.reset();
+ int len = bm.readBytes(bytes);
+ assertEquals(2, len);
+ len = bm.readBytes(bytes);
+ assertEquals(-1, len);
+ len = bm.readBytes(bytes, 2);
+ assertEquals(-1, len);
+ bm.reset();
+ len = bm.readBytes(bytes, 2);
+ assertEquals(2, len);
+ bm.reset();
+ len = bm.readBytes(bytes, 1);
+ assertEquals(1, len);
+
}
public void testEOFByte() throws Exception
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java
new file mode 100644
index 0000000000..af7856a78a
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java
@@ -0,0 +1,434 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client.message;
+
+import junit.framework.TestCase;
+import org.apache.qpid.client.message.JMSStreamMessage;
+import org.apache.qpid.client.message.TestMessageHelper;
+
+import javax.jms.MessageNotReadableException;
+import javax.jms.MessageNotWriteableException;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageEOFException;
+import java.util.HashMap;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class StreamMessageTest extends TestCase
+{
+ /**
+ * Tests that on creation a call to getBodyLength() throws an exception
+ * if null was passed in during creation
+ */
+ public void testNotReadableOnCreationWithNull() throws Exception
+ {
+ try
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.readByte();
+ fail("expected exception did not occur");
+ }
+ catch (MessageNotReadableException m)
+ {
+ // ok
+ }
+ catch (Exception e)
+ {
+ fail("expected MessageNotReadableException, got " + e);
+ }
+ }
+
+ public void testResetMakesReadble() throws Exception
+ {
+ try
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeInt(10);
+ bm.reset();
+ bm.writeInt(12);
+ fail("expected exception did not occur");
+ }
+ catch (MessageNotWriteableException m)
+ {
+ // ok
+ }
+ catch (Exception e)
+ {
+ fail("expected MessageNotWriteableException, got " + e);
+ }
+ }
+
+ public void testClearBodyMakesWritable() throws Exception
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeInt(10);
+ bm.reset();
+ bm.clearBody();
+ bm.writeInt(10);
+ }
+
+ public void testWriteBoolean() throws Exception
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeBoolean(true);
+ bm.writeBoolean(false);
+ bm.reset();
+ boolean val = bm.readBoolean();
+ assertEquals(true, val);
+ val = bm.readBoolean();
+ assertEquals(false, val);
+ }
+
+ public void testWriteInt() throws Exception
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeInt(10);
+ bm.reset();
+ int val = bm.readInt();
+ assertTrue(val == 10);
+ }
+
+ public void testWriteString() throws Exception
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeString("Bananas");
+ bm.reset();
+ String res = bm.readString();
+ assertEquals("Bananas", res);
+ }
+
+ public void testWriteBytes() throws Exception
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ byte[] bytes = {1,2,3,4};
+ bm.writeBytes(bytes, 1, 2);
+ bm.reset();
+ bytes = new byte[2];
+ bm.readBytes(bytes);
+ assertEquals(2, bytes[0]);
+ assertEquals(3, bytes[1]);
+ }
+
+ public void testWriteObject() throws Exception
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeObject(new Boolean(true));
+ bm.writeObject(new Boolean(false));
+ bm.writeObject(new Byte((byte)2));
+ bm.writeObject(new byte[]{1,2,3,4});
+ bm.writeObject(new Character('g'));
+ bm.writeObject(new Short((short) 29));
+ bm.writeObject(new Integer(101));
+ bm.writeObject(new Long(50003222L));
+ bm.writeObject("Foobar");
+ bm.writeObject(new Float(1.7f));
+ bm.writeObject(new Double(8.7d));
+ bm.reset();
+ assertTrue(bm.readBoolean());
+ assertTrue(!bm.readBoolean());
+ assertEquals((byte)2, bm.readByte());
+ byte[] bytes = new byte[4];
+ bm.readBytes(bytes);
+ assertEquals('g', bm.readChar());
+ assertEquals((short) 29, bm.readShort());
+ assertEquals(101, bm.readInt());
+ assertEquals(50003222L, bm.readLong());
+ assertEquals("Foobar", bm.readString());
+ assertEquals(1.7f, bm.readFloat());
+ assertEquals(8.7d, bm.readDouble());
+ }
+
+ public void testWriteObjectRejectsNonPrimitives() throws Exception
+ {
+ try
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeObject(new HashMap());
+ fail("expected MessageFormatException was not thrown");
+ }
+ catch (MessageFormatException e)
+ {
+ // pass
+ }
+ }
+
+ public void testWriteObjectThrowsNPE() throws Exception
+ {
+ try
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeObject(null);
+ fail("expected exception did not occur");
+ }
+ catch (NullPointerException n)
+ {
+ // ok
+ }
+ catch (Exception e)
+ {
+ fail("expected NullPointerException, got " + e);
+ }
+ }
+
+ public void testReadBoolean() throws Exception
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeBoolean(true);
+ bm.reset();
+ boolean result = bm.readBoolean();
+ assertTrue(result);
+ }
+
+ public void testReadBytesChecksNull() throws Exception
+ {
+ try
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.readBytes(null);
+ }
+ catch (IllegalArgumentException e)
+ {
+ // pass
+ }
+ }
+
+ public void testReadBytesReturnsCorrectLengths() throws Exception
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ byte[] bytes = {2, 3};
+ bm.writeBytes(bytes);
+ bm.writeBytes(null);
+ bm.writeBytes(new byte[]{});
+ bm.reset();
+ int len = bm.readBytes(bytes);
+ assertEquals(2, len);
+ len = bm.readBytes(bytes);
+ assertEquals(-1, len);
+ len = bm.readBytes(bytes);
+ assertEquals(0, len);
+ }
+
+ public void testReadMultipleByteArrays() throws Exception
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ byte[] bytes = {2, 3, 4};
+ bm.writeBytes(bytes);
+ bm.writeBytes(bytes);
+ bm.reset();
+ byte[] result = new byte[2];
+ int len = bm.readBytes(result);
+ assertEquals(2, len);
+ len = bm.readBytes(result);
+ assertEquals(1, len);
+ len = bm.readBytes(result);
+ assertEquals(2, len);
+ }
+
+ public void testEOFByte() throws Exception
+ {
+ try
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeByte((byte)1);
+ bm.reset();
+ bm.readByte();
+ // should throw
+ bm.readByte();
+ fail("expected exception did not occur");
+ }
+ catch (MessageEOFException m)
+ {
+ // ok
+ }
+ catch (Exception e)
+ {
+ fail("expected MessageEOFException, got " + e);
+ }
+ }
+
+ public void testEOFBoolean() throws Exception
+ {
+ try
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeBoolean(true);
+ bm.reset();
+ bm.readBoolean();
+ // should throw
+ bm.readBoolean();
+ fail("expected exception did not occur");
+ }
+ catch (MessageEOFException m)
+ {
+ // ok
+ }
+ catch (Exception e)
+ {
+ fail("expected MessageEOFException, got " + e);
+ }
+ }
+
+ public void testEOFChar() throws Exception
+ {
+ try
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeChar('A');
+ bm.reset();
+ bm.readChar();
+ // should throw
+ bm.readChar();
+ fail("expected exception did not occur");
+ }
+ catch (MessageEOFException m)
+ {
+ // ok
+ }
+ catch (Exception e)
+ {
+ fail("expected MessageEOFException, got " + e);
+ }
+ }
+
+ public void testEOFDouble() throws Exception
+ {
+ try
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeDouble(1.3d);
+ bm.reset();
+ bm.readDouble();
+ // should throw
+ bm.readDouble();
+ fail("expected exception did not occur");
+ }
+ catch (MessageEOFException m)
+ {
+ // ok
+ }
+ catch (Exception e)
+ {
+ fail("expected MessageEOFException, got " + e);
+ }
+ }
+
+ public void testEOFFloat() throws Exception
+ {
+ try
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeFloat(1.3f);
+ bm.reset();
+ bm.readFloat();
+ // should throw
+ bm.readFloat();
+ fail("expected exception did not occur");
+ }
+ catch (MessageEOFException m)
+ {
+ // ok
+ }
+ catch (Exception e)
+ {
+ fail("expected MessageEOFException, got " + e);
+ }
+ }
+
+ public void testEOFInt() throws Exception
+ {
+ try
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeInt(99);
+ bm.reset();
+ bm.readInt();
+ // should throw
+ bm.readInt();
+ fail("expected exception did not occur");
+ }
+ catch (MessageEOFException m)
+ {
+ // ok
+ }
+ catch (Exception e)
+ {
+ fail("expected MessageEOFException, got " + e);
+ }
+ }
+
+ public void testEOFLong() throws Exception
+ {
+ try
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeLong(4L);
+ bm.reset();
+ bm.readLong();
+ // should throw
+ bm.readLong();
+ fail("expected exception did not occur");
+ }
+ catch (MessageEOFException m)
+ {
+ // ok
+ }
+ catch (Exception e)
+ {
+ fail("expected MessageEOFException, got " + e);
+ }
+ }
+
+ public void testEOFShort() throws Exception
+ {
+ try
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeShort((short)4);
+ bm.reset();
+ bm.readShort();
+ // should throw
+ bm.readShort();
+ fail("expected exception did not occur");
+ }
+ catch (MessageEOFException m)
+ {
+ // ok
+ }
+ catch (Exception e)
+ {
+ fail("expected MessageEOFException, got " + e);
+ }
+ }
+
+ public void testToBodyStringWithNull() throws Exception
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.reset();
+ String result = bm.toBodyString();
+ assertNull(result);
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(StreamMessageTest.class);
+ }
+}