diff options
author | Robert Greig <rgreig@apache.org> | 2006-12-09 14:50:26 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2006-12-09 14:50:26 +0000 |
commit | f616a17577e96442ec43de0afe87cd3e0704ee3b (patch) | |
tree | 3dd42752e3e08ad3ee2d0cf9f765407bb4c5b6e5 /java | |
parent | 92a90067b4ea4240a0303100f46130f0ad612898 (diff) | |
download | qpid-python-f616a17577e96442ec43de0afe87cd3e0704ee3b.tar.gz |
QPID-102 Addition of StreamMessage support
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@484987 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
9 files changed, 1358 insertions, 107 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 3f0aee23a0..8f90913e5c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -27,6 +27,7 @@ import org.apache.qpid.client.failover.FailoverSupport; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; +import org.apache.qpid.client.message.JMSStreamMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.framing.*; @@ -367,13 +368,24 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public StreamMessage createStreamMessage() throws JMSException { - checkNotClosed(); - throw new UnsupportedOperationException("Stream messages not supported"); + synchronized (_connection.getFailoverMutex()) + { + checkNotClosed(); + + try + { + return (StreamMessage) _messageFactoryRegistry.createMessage(JMSStreamMessage.MIME_TYPE); + } + catch (AMQException e) + { + throw new JMSException("Unable to create text message: " + e); + } + } } public TextMessage createTextMessage() throws JMSException { - synchronized(_connection.getFailoverMutex()) + synchronized (_connection.getFailoverMutex()) { checkNotClosed(); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java new file mode 100644 index 0000000000..77b3bd7566 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java @@ -0,0 +1,147 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.message; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.AMQException; + +import javax.jms.JMSException; +import javax.jms.MessageNotReadableException; +import javax.jms.MessageEOFException; +import javax.jms.MessageNotWriteableException; +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.CharacterCodingException; + +/** + * @author Apache Software Foundation + */ +public abstract class AbstractBytesMessage extends AbstractJMSMessage +{ + + /** + * The default initial size of the buffer. The buffer expands automatically. + */ + private static final int DEFAULT_BUFFER_INITIAL_SIZE = 1024; + + AbstractBytesMessage() + { + this(null); + } + + /** + * Construct a bytes message with existing data. + * + * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is + * set to auto expand + */ + AbstractBytesMessage(ByteBuffer data) + { + super(data); // this instanties a content header + getJmsContentHeaderProperties().setContentType(getMimeType()); + + if (_data == null) + { + _data = ByteBuffer.allocate(DEFAULT_BUFFER_INITIAL_SIZE); + _data.setAutoExpand(true); + } + } + + AbstractBytesMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) + throws AMQException + { + // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea + super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, data); + getJmsContentHeaderProperties().setContentType(getMimeType()); + } + + public void clearBodyImpl() throws JMSException + { + _data.clear(); + } + + public String toBodyString() throws JMSException + { + checkReadable(); + try + { + return getText(); + } + catch (IOException e) + { + throw new JMSException(e.toString()); + } + } + + /** + * We reset the stream before and after reading the data. This means that toString() will always output + * the entire message and also that the caller can then immediately start reading as if toString() had + * never been called. + * + * @return + * @throws IOException + */ + private String getText() throws IOException + { + // this will use the default platform encoding + if (_data == null) + { + return null; + } + int pos = _data.position(); + _data.rewind(); + // one byte left is for the end of frame marker + if (_data.remaining() == 0) + { + // this is really redundant since pos must be zero + _data.position(pos); + return null; + } + else + { + String data = _data.getString(Charset.forName("UTF8").newDecoder()); + _data.position(pos); + return data; + } + } + + /** + * Check that there is at least a certain number of bytes available to read + * + * @param len the number of bytes + * @throws javax.jms.MessageEOFException if there are less than len bytes available to read + */ + protected void checkAvailable(int len) throws MessageEOFException + { + if (_data.remaining() < len) + { + throw new MessageEOFException("Unable to read " + len + " bytes"); + } + } + + public void reset() throws JMSException + { + super.reset(); + _data.flip(); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java index 6e1958e40a..456d4d520c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,29 +20,21 @@ */ package org.apache.qpid.client.message; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.AMQException; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.ContentHeaderBody; +import javax.jms.BytesMessage; import javax.jms.JMSException; -import javax.jms.MessageNotReadableException; -import javax.jms.MessageNotWriteableException; +import javax.jms.MessageFormatException; import javax.jms.MessageEOFException; -import java.io.*; -import java.nio.charset.Charset; import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; -public class JMSBytesMessage extends AbstractJMSMessage implements javax.jms.BytesMessage +public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessage { private static final String MIME_TYPE = "application/octet-stream"; - - /** - * The default initial size of the buffer. The buffer expands automatically. - */ - private static final int DEFAULT_BUFFER_INITIAL_SIZE = 1024; - JMSBytesMessage() { this(null); @@ -57,71 +49,12 @@ public class JMSBytesMessage extends AbstractJMSMessage implements javax.jms.Byt JMSBytesMessage(ByteBuffer data) { super(data); // this instanties a content header - getJmsContentHeaderProperties().setContentType(MIME_TYPE); - - if (_data == null) - { - _data = ByteBuffer.allocate(DEFAULT_BUFFER_INITIAL_SIZE); - _data.setAutoExpand(true); - } } JMSBytesMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) throws AMQException { - // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea - super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, data); - getJmsContentHeaderProperties().setContentType(MIME_TYPE); - } - - public void clearBodyImpl() throws JMSException - { - _data.clear(); - } - - public String toBodyString() throws JMSException - { - checkReadable(); - try - { - return getText(); - } - catch (IOException e) - { - throw new JMSException(e.toString()); - } - } - - /** - * We reset the stream before and after reading the data. This means that toString() will always output - * the entire message and also that the caller can then immediately start reading as if toString() had - * never been called. - * - * @return - * @throws IOException - */ - private String getText() throws IOException - { - // this will use the default platform encoding - if (_data == null) - { - return null; - } - int pos = _data.position(); - _data.rewind(); - // one byte left is for the end of frame marker - if (_data.remaining() == 0) - { - // this is really redundant since pos must be zero - _data.position(pos); - return null; - } - else - { - String data = _data.getString(Charset.forName("UTF8").newDecoder()); - _data.position(pos); - return data; - } + super(messageNbr, contentHeader, data); } public String getMimeType() @@ -135,21 +68,6 @@ public class JMSBytesMessage extends AbstractJMSMessage implements javax.jms.Byt return _data.limit(); } - - /** - * Check that there is at least a certain number of bytes available to read - * - * @param len the number of bytes - * @throws MessageEOFException if there are less than len bytes available to read - */ - private void checkAvailable(int len) throws MessageEOFException - { - if (_data.remaining() < len) - { - throw new MessageEOFException("Unable to read " + len + " bytes"); - } - } - public boolean readBoolean() throws JMSException { checkReadable(); @@ -340,6 +258,8 @@ public class JMSBytesMessage extends AbstractJMSMessage implements javax.jms.Byt try { _data.putString(string, Charset.forName("UTF-8").newEncoder()); + // we must add the null terminator manually + _data.put((byte)0); } catch (CharacterCodingException e) { @@ -368,13 +288,51 @@ public class JMSBytesMessage extends AbstractJMSMessage implements javax.jms.Byt { throw new NullPointerException("Argument must not be null"); } - _data.putObject(object); - } - - public void reset() throws JMSException - { - super.reset(); - _data.flip(); + Class clazz = object.getClass(); + if (clazz == Byte.class) + { + writeByte((Byte) object); + } + else if (clazz == Boolean.class) + { + writeBoolean((Boolean) object); + } + else if (clazz == byte[].class) + { + writeBytes((byte[]) object); + } + else if (clazz == Short.class) + { + writeShort((Short) object); + } + else if (clazz == Character.class) + { + writeChar((Character) object); + } + else if (clazz == Integer.class) + { + writeInt((Integer) object); + } + else if (clazz == Long.class) + { + writeLong((Long) object); + } + else if (clazz == Float.class) + { + writeFloat((Float) object); + } + else if (clazz == Double.class) + { + writeDouble((Double) object); + } + else if (clazz == String.class) + { + writeUTF((String) object); + } + else + { + throw new MessageFormatException("Only primitives plus byte arrays and String are valid types"); + } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java new file mode 100644 index 0000000000..cc820a5623 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java @@ -0,0 +1,509 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.message; + +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.AMQException; +import org.apache.mina.common.ByteBuffer; + +import javax.jms.JMSException; +import javax.jms.MessageEOFException; +import javax.jms.MessageFormatException; +import javax.jms.StreamMessage; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; + +/** + * @author Apache Software Foundation + */ +public class JMSStreamMessage extends AbstractBytesMessage implements StreamMessage +{ + public static final String MIME_TYPE="jms/stream-message"; + + private static final String[] _typeNames = { "boolean", + "byte", + "byte array", + "short", + "char", + "int", + "long", + "float", + "double", + "utf string" }; + + private static final byte BOOLEAN_TYPE = (byte) 1; + + private static final byte BYTE_TYPE = (byte) 2; + + private static final byte BYTEARRAY_TYPE = (byte) 3; + + private static final byte SHORT_TYPE = (byte) 4; + + private static final byte CHAR_TYPE = (byte) 5; + + private static final byte INT_TYPE = (byte) 6; + + private static final byte LONG_TYPE = (byte) 7; + + private static final byte FLOAT_TYPE = (byte) 8; + + private static final byte DOUBLE_TYPE = (byte) 9; + + private static final byte STRING_TYPE = (byte) 10; + + /** + * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read + * a byte array in multiple chunks, hence this is used to track how much is left to be read + */ + private int _byteArrayRemaining = -1; + + JMSStreamMessage() + { + this(null); + } + + /** + * Construct a stream message with existing data. + * + * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is + * set to auto expand + */ + JMSStreamMessage(ByteBuffer data) + { + super(data); // this instanties a content header + } + + + JMSStreamMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) + throws AMQException + { + super(messageNbr, contentHeader, data); + } + + public String getMimeType() + { + return MIME_TYPE; + } + + private void readAndCheckType(byte type) throws MessageFormatException + { + if (_data.get() != type) + { + throw new MessageFormatException("Type " + _typeNames[type - 1] + " not found next in stream"); + } + } + + private void writeTypeDiscriminator(byte type) + { + _data.put(type); + } + + public boolean readBoolean() throws JMSException + { + checkReadable(); + checkAvailable(2); + readAndCheckType(BOOLEAN_TYPE); + return readBooleanImpl(); + } + + private boolean readBooleanImpl() + { + return _data.get() != 0; + } + + public byte readByte() throws JMSException + { + checkReadable(); + checkAvailable(2); + readAndCheckType(BYTE_TYPE); + return readByteImpl(); + } + + private byte readByteImpl() + { + return _data.get(); + } + + public short readShort() throws JMSException + { + checkReadable(); + checkAvailable(3); + readAndCheckType(SHORT_TYPE); + return readShortImpl(); + } + + private short readShortImpl() + { + return _data.getShort(); + } + + /** + * Note that this method reads a unicode character as two bytes from the stream + * + * @return the character read from the stream + * @throws JMSException + */ + public char readChar() throws JMSException + { + checkReadable(); + checkAvailable(3); + readAndCheckType(CHAR_TYPE); + return readCharImpl(); + } + + private char readCharImpl() + { + return _data.getChar(); + } + + public int readInt() throws JMSException + { + checkReadable(); + checkAvailable(5); + readAndCheckType(INT_TYPE); + return readIntImpl(); + } + + private int readIntImpl() + { + return _data.getInt(); + } + + public long readLong() throws JMSException + { + checkReadable(); + checkAvailable(9); + readAndCheckType(LONG_TYPE); + return readLongImpl(); + } + + private long readLongImpl() + { + return _data.getLong(); + } + + public float readFloat() throws JMSException + { + checkReadable(); + checkAvailable(5); + readAndCheckType(FLOAT_TYPE); + return readFloatImpl(); + } + + private float readFloatImpl() + { + return _data.getFloat(); + } + + public double readDouble() throws JMSException + { + checkReadable(); + checkAvailable(9); + readAndCheckType(DOUBLE_TYPE); + return readDoubleImpl(); + } + + private double readDoubleImpl() + { + return _data.getDouble(); + } + + public String readString() throws JMSException + { + checkReadable(); + // we check only for one byte plus the type byte since theoretically the string could be only a + // single byte when using UTF-8 encoding + checkAvailable(2); + readAndCheckType(STRING_TYPE); + return readStringImpl(); + } + + private String readStringImpl() throws JMSException + { + try + { + return _data.getString(Charset.forName("UTF-8").newDecoder()); + } + catch (CharacterCodingException e) + { + JMSException je = new JMSException("Error decoding byte stream as a UTF8 string: " + e); + je.setLinkedException(e); + throw je; + } + } + + public int readBytes(byte[] bytes) throws JMSException + { + if (bytes == null) + { + throw new IllegalArgumentException("byte array must not be null"); + } + checkReadable(); + // first call + if (_byteArrayRemaining == -1) + { + // type discriminator plus array size + checkAvailable(5); + readAndCheckType(BYTEARRAY_TYPE); + int size = _data.getInt(); + // size of -1 indicates null + if (size == -1) + { + return -1; + } + else + { + if (size > _data.remaining()) + { + throw new MessageEOFException("Byte array has stated size " + size + " but message only contains " + + _data.remaining() + " bytes"); + } + else + { + _byteArrayRemaining = size; + } + } + } + + return readBytesImpl(bytes); + } + + private int readBytesImpl(byte[] bytes) + { + int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining); + _byteArrayRemaining -= count; + if (_byteArrayRemaining == 0) + { + _byteArrayRemaining = -1; + } + if (count == 0) + { + return 0; + } + else + { + _data.get(bytes, 0, count); + return count; + } + } + + public Object readObject() throws JMSException + { + checkReadable(); + checkAvailable(1); + byte type = _data.get(); + Object result = null; + switch (type) + { + case BOOLEAN_TYPE: + result = readBooleanImpl(); + break; + case BYTE_TYPE: + result = readByteImpl(); + break; + case BYTEARRAY_TYPE: + checkAvailable(4); + int size = _data.getInt(); + if (size == -1) + { + result = null; + } + else + { + _byteArrayRemaining = size; + result = new byte[size]; + readBytesImpl(new byte[size]); + } + break; + case SHORT_TYPE: + result = readShortImpl(); + break; + case CHAR_TYPE: + result = readCharImpl(); + break; + case INT_TYPE: + result = readIntImpl(); + break; + case LONG_TYPE: + result = readLongImpl(); + break; + case FLOAT_TYPE: + result = readFloatImpl(); + break; + case DOUBLE_TYPE: + result = readDoubleImpl(); + break; + case STRING_TYPE: + result = readStringImpl(); + break; + } + return result; + } + + public void writeBoolean(boolean b) throws JMSException + { + checkWritable(); + writeTypeDiscriminator(BOOLEAN_TYPE); + _data.put(b ? (byte) 1 : (byte) 0); + } + + public void writeByte(byte b) throws JMSException + { + checkWritable(); + writeTypeDiscriminator(BYTE_TYPE); + _data.put(b); + } + + public void writeShort(short i) throws JMSException + { + checkWritable(); + writeTypeDiscriminator(SHORT_TYPE); + _data.putShort(i); + } + + public void writeChar(char c) throws JMSException + { + checkWritable(); + writeTypeDiscriminator(CHAR_TYPE); + _data.putChar(c); + } + + public void writeInt(int i) throws JMSException + { + checkWritable(); + writeTypeDiscriminator(INT_TYPE); + _data.putInt(i); + } + + public void writeLong(long l) throws JMSException + { + checkWritable(); + writeTypeDiscriminator(LONG_TYPE); + _data.putLong(l); + } + + public void writeFloat(float v) throws JMSException + { + checkWritable(); + writeTypeDiscriminator(FLOAT_TYPE); + _data.putFloat(v); + } + + public void writeDouble(double v) throws JMSException + { + checkWritable(); + writeTypeDiscriminator(DOUBLE_TYPE); + _data.putDouble(v); + } + + public void writeString(String string) throws JMSException + { + checkWritable(); + writeTypeDiscriminator(STRING_TYPE); + try + { + _data.putString(string, Charset.forName("UTF-8").newEncoder()); + // we must write the null terminator ourselves + _data.put((byte)0); + } + catch (CharacterCodingException e) + { + JMSException ex = new JMSException("Unable to encode string: " + e); + ex.setLinkedException(e); + throw ex; + } + } + + public void writeBytes(byte[] bytes) throws JMSException + { + checkWritable(); + writeBytes(bytes, 0, bytes == null?0:bytes.length); + } + + public void writeBytes(byte[] bytes, int offset, int length) throws JMSException + { + checkWritable(); + writeTypeDiscriminator(BYTEARRAY_TYPE); + if (bytes == null) + { + _data.putInt(-1); + } + else + { + _data.putInt(length); + _data.put(bytes, offset, length); + } + } + + public void writeObject(Object object) throws JMSException + { + checkWritable(); + if (object == null) + { + throw new NullPointerException("Argument must not be null"); + } + Class clazz = object.getClass(); + if (clazz == Byte.class) + { + writeByte((Byte) object); + } + else if (clazz == Boolean.class) + { + writeBoolean((Boolean) object); + } + else if (clazz == byte[].class) + { + writeBytes((byte[]) object); + } + else if (clazz == Short.class) + { + writeShort((Short) object); + } + else if (clazz == Character.class) + { + writeChar((Character) object); + } + else if (clazz == Integer.class) + { + writeInt((Integer) object); + } + else if (clazz == Long.class) + { + writeLong((Long) object); + } + else if (clazz == Float.class) + { + writeFloat((Float) object); + } + else if (clazz == Double.class) + { + writeDouble((Double) object); + } + else if (clazz == String.class) + { + writeString((String) object); + } + else + { + throw new MessageFormatException("Only primitives plus byte arrays and String are valid types"); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java new file mode 100644 index 0000000000..aae9f0cdb2 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.message; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.AMQException; + +import javax.jms.JMSException; + +public class JMSStreamMessageFactory extends AbstractJMSMessageFactory +{ + protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) throws + AMQException + { + return new JMSStreamMessage(deliveryTag, contentHeader, data); + } + + public AbstractJMSMessage createMessage() throws JMSException + { + return new JMSStreamMessage(); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java index 31c9c2ed91..348988f06d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java @@ -103,6 +103,7 @@ public class MessageFactoryRegistry mf.registerFactory("text/xml", new JMSTextMessageFactory()); mf.registerFactory("application/octet-stream", new JMSBytesMessageFactory()); mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory()); + mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory()); mf.registerFactory(null, new JMSBytesMessageFactory()); return mf; } diff --git a/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java b/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java index c34dbf14f1..9b477c19e2 100644 --- a/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java +++ b/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java @@ -38,4 +38,9 @@ public class TestMessageHelper { return new JMSMapMessage(); } + + public static JMSStreamMessage newJMSStreamMessage() + { + return new JMSStreamMessage(); + } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/BytesMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/BytesMessageTest.java index 2a76c920b1..7ffb3ca469 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/BytesMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/BytesMessageTest.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,14 +20,15 @@ */ package org.apache.qpid.test.unit.client.message; +import junit.framework.TestCase; import org.apache.qpid.client.message.JMSBytesMessage; import org.apache.qpid.client.message.TestMessageHelper; +import javax.jms.MessageEOFException; import javax.jms.MessageNotReadableException; import javax.jms.MessageNotWriteableException; -import javax.jms.MessageEOFException; - -import junit.framework.TestCase; +import javax.jms.MessageFormatException; +import java.util.HashMap; public class BytesMessageTest extends TestCase { @@ -82,6 +83,18 @@ public class BytesMessageTest extends TestCase bm.writeInt(10); } + public void testWriteBoolean() throws Exception + { + JMSBytesMessage bm = TestMessageHelper.newJMSBytesMessage(); + bm.writeBoolean(true); + bm.writeBoolean(false); + bm.reset(); + boolean val = bm.readBoolean(); + assertEquals(true, val); + val = bm.readBoolean(); + assertEquals(false, val); + } + public void testWriteInt() throws Exception { JMSBytesMessage bm = TestMessageHelper.newJMSBytesMessage(); @@ -102,6 +115,61 @@ public class BytesMessageTest extends TestCase assertEquals("Bananas", res); } + public void testWriteBytes() throws Exception + { + JMSBytesMessage bm = TestMessageHelper.newJMSBytesMessage(); + byte[] bytes = {1,2,3,4}; + bm.writeBytes(bytes, 1, 2); + bm.reset(); + bytes = new byte[2]; + bm.readBytes(bytes); + assertEquals(2, bytes[0]); + assertEquals(3, bytes[1]); + } + + public void testWriteObject() throws Exception + { + JMSBytesMessage bm = TestMessageHelper.newJMSBytesMessage(); + bm.writeObject(new Boolean(true)); + bm.writeObject(new Boolean(false)); + bm.writeObject(new Byte((byte)2)); + bm.writeObject(new byte[]{1,2,3,4}); + bm.writeObject(new Character('g')); + bm.writeObject(new Short((short) 29)); + bm.writeObject(new Integer(101)); + bm.writeObject(new Long(50003222L)); + bm.writeObject("Foobar"); + bm.writeObject(new Float(1.7f)); + bm.writeObject(new Double(8.7d)); + bm.reset(); + assertTrue(bm.readBoolean()); + assertTrue(!bm.readBoolean()); + assertEquals((byte)2, bm.readByte()); + byte[] bytes = new byte[4]; + bm.readBytes(bytes); + assertEquals('g', bm.readChar()); + assertEquals((short) 29, bm.readShort()); + assertEquals(101, bm.readInt()); + assertEquals(50003222L, bm.readLong()); + assertEquals("Foobar", bm.readUTF()); + assertEquals(1.7f, bm.readFloat()); + assertEquals(8.7d, bm.readDouble()); + } + + public void testWriteObjectRejectsNonPrimitives() throws Exception + { + try + { + JMSBytesMessage bm = TestMessageHelper.newJMSBytesMessage(); + bm.writeObject(new HashMap()); + fail("expected MessageFormatException was not thrown"); + } + catch (MessageFormatException e) + { + // pass + } + } + public void testWriteObjectThrowsNPE() throws Exception { try @@ -126,7 +194,83 @@ public class BytesMessageTest extends TestCase bm.writeBoolean(true); bm.reset(); boolean result = bm.readBoolean(); - assertTrue(result); + assertTrue(result); + } + + public void testReadUnsignedByte() throws Exception + { + JMSBytesMessage bm = TestMessageHelper.newJMSBytesMessage(); + bm.writeByte((byte) 9); + bm.reset(); + int result = bm.readUnsignedByte(); + assertEquals(9, result); + } + + public void testReadUnsignedShort() throws Exception + { + JMSBytesMessage bm = TestMessageHelper.newJMSBytesMessage(); + bm.writeShort((byte) 9); + bm.reset(); + int result = bm.readUnsignedShort(); + assertEquals(9, result); + } + + public void testReadBytesChecksNull() throws Exception + { + try + { + JMSBytesMessage bm = TestMessageHelper.newJMSBytesMessage(); + bm.readBytes(null); + } + catch (IllegalArgumentException e) + { + // pass + } + + try + { + JMSBytesMessage bm = TestMessageHelper.newJMSBytesMessage(); + bm.readBytes(null, 1); + } + catch (IllegalArgumentException e) + { + // pass + } + } + + public void testReadBytesChecksMaxSize() throws Exception + { + try + { + JMSBytesMessage bm = TestMessageHelper.newJMSBytesMessage(); + byte[] bytes = new byte[100]; + bm.readBytes(bytes, 120); + } + catch (IllegalArgumentException e) + { + // pass + } + } + + public void testReadBytesReturnsCorrectLengths() throws Exception + { + JMSBytesMessage bm = TestMessageHelper.newJMSBytesMessage(); + byte[] bytes = {2, 3}; + bm.writeBytes(bytes); + bm.reset(); + int len = bm.readBytes(bytes); + assertEquals(2, len); + len = bm.readBytes(bytes); + assertEquals(-1, len); + len = bm.readBytes(bytes, 2); + assertEquals(-1, len); + bm.reset(); + len = bm.readBytes(bytes, 2); + assertEquals(2, len); + bm.reset(); + len = bm.readBytes(bytes, 1); + assertEquals(1, len); + } public void testEOFByte() throws Exception diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java new file mode 100644 index 0000000000..af7856a78a --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java @@ -0,0 +1,434 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client.message; + +import junit.framework.TestCase; +import org.apache.qpid.client.message.JMSStreamMessage; +import org.apache.qpid.client.message.TestMessageHelper; + +import javax.jms.MessageNotReadableException; +import javax.jms.MessageNotWriteableException; +import javax.jms.MessageFormatException; +import javax.jms.MessageEOFException; +import java.util.HashMap; + +/** + * @author Apache Software Foundation + */ +public class StreamMessageTest extends TestCase +{ + /** + * Tests that on creation a call to getBodyLength() throws an exception + * if null was passed in during creation + */ + public void testNotReadableOnCreationWithNull() throws Exception + { + try + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.readByte(); + fail("expected exception did not occur"); + } + catch (MessageNotReadableException m) + { + // ok + } + catch (Exception e) + { + fail("expected MessageNotReadableException, got " + e); + } + } + + public void testResetMakesReadble() throws Exception + { + try + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeInt(10); + bm.reset(); + bm.writeInt(12); + fail("expected exception did not occur"); + } + catch (MessageNotWriteableException m) + { + // ok + } + catch (Exception e) + { + fail("expected MessageNotWriteableException, got " + e); + } + } + + public void testClearBodyMakesWritable() throws Exception + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeInt(10); + bm.reset(); + bm.clearBody(); + bm.writeInt(10); + } + + public void testWriteBoolean() throws Exception + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeBoolean(true); + bm.writeBoolean(false); + bm.reset(); + boolean val = bm.readBoolean(); + assertEquals(true, val); + val = bm.readBoolean(); + assertEquals(false, val); + } + + public void testWriteInt() throws Exception + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeInt(10); + bm.reset(); + int val = bm.readInt(); + assertTrue(val == 10); + } + + public void testWriteString() throws Exception + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeString("Bananas"); + bm.reset(); + String res = bm.readString(); + assertEquals("Bananas", res); + } + + public void testWriteBytes() throws Exception + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + byte[] bytes = {1,2,3,4}; + bm.writeBytes(bytes, 1, 2); + bm.reset(); + bytes = new byte[2]; + bm.readBytes(bytes); + assertEquals(2, bytes[0]); + assertEquals(3, bytes[1]); + } + + public void testWriteObject() throws Exception + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeObject(new Boolean(true)); + bm.writeObject(new Boolean(false)); + bm.writeObject(new Byte((byte)2)); + bm.writeObject(new byte[]{1,2,3,4}); + bm.writeObject(new Character('g')); + bm.writeObject(new Short((short) 29)); + bm.writeObject(new Integer(101)); + bm.writeObject(new Long(50003222L)); + bm.writeObject("Foobar"); + bm.writeObject(new Float(1.7f)); + bm.writeObject(new Double(8.7d)); + bm.reset(); + assertTrue(bm.readBoolean()); + assertTrue(!bm.readBoolean()); + assertEquals((byte)2, bm.readByte()); + byte[] bytes = new byte[4]; + bm.readBytes(bytes); + assertEquals('g', bm.readChar()); + assertEquals((short) 29, bm.readShort()); + assertEquals(101, bm.readInt()); + assertEquals(50003222L, bm.readLong()); + assertEquals("Foobar", bm.readString()); + assertEquals(1.7f, bm.readFloat()); + assertEquals(8.7d, bm.readDouble()); + } + + public void testWriteObjectRejectsNonPrimitives() throws Exception + { + try + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeObject(new HashMap()); + fail("expected MessageFormatException was not thrown"); + } + catch (MessageFormatException e) + { + // pass + } + } + + public void testWriteObjectThrowsNPE() throws Exception + { + try + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeObject(null); + fail("expected exception did not occur"); + } + catch (NullPointerException n) + { + // ok + } + catch (Exception e) + { + fail("expected NullPointerException, got " + e); + } + } + + public void testReadBoolean() throws Exception + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeBoolean(true); + bm.reset(); + boolean result = bm.readBoolean(); + assertTrue(result); + } + + public void testReadBytesChecksNull() throws Exception + { + try + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.readBytes(null); + } + catch (IllegalArgumentException e) + { + // pass + } + } + + public void testReadBytesReturnsCorrectLengths() throws Exception + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + byte[] bytes = {2, 3}; + bm.writeBytes(bytes); + bm.writeBytes(null); + bm.writeBytes(new byte[]{}); + bm.reset(); + int len = bm.readBytes(bytes); + assertEquals(2, len); + len = bm.readBytes(bytes); + assertEquals(-1, len); + len = bm.readBytes(bytes); + assertEquals(0, len); + } + + public void testReadMultipleByteArrays() throws Exception + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + byte[] bytes = {2, 3, 4}; + bm.writeBytes(bytes); + bm.writeBytes(bytes); + bm.reset(); + byte[] result = new byte[2]; + int len = bm.readBytes(result); + assertEquals(2, len); + len = bm.readBytes(result); + assertEquals(1, len); + len = bm.readBytes(result); + assertEquals(2, len); + } + + public void testEOFByte() throws Exception + { + try + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeByte((byte)1); + bm.reset(); + bm.readByte(); + // should throw + bm.readByte(); + fail("expected exception did not occur"); + } + catch (MessageEOFException m) + { + // ok + } + catch (Exception e) + { + fail("expected MessageEOFException, got " + e); + } + } + + public void testEOFBoolean() throws Exception + { + try + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeBoolean(true); + bm.reset(); + bm.readBoolean(); + // should throw + bm.readBoolean(); + fail("expected exception did not occur"); + } + catch (MessageEOFException m) + { + // ok + } + catch (Exception e) + { + fail("expected MessageEOFException, got " + e); + } + } + + public void testEOFChar() throws Exception + { + try + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeChar('A'); + bm.reset(); + bm.readChar(); + // should throw + bm.readChar(); + fail("expected exception did not occur"); + } + catch (MessageEOFException m) + { + // ok + } + catch (Exception e) + { + fail("expected MessageEOFException, got " + e); + } + } + + public void testEOFDouble() throws Exception + { + try + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeDouble(1.3d); + bm.reset(); + bm.readDouble(); + // should throw + bm.readDouble(); + fail("expected exception did not occur"); + } + catch (MessageEOFException m) + { + // ok + } + catch (Exception e) + { + fail("expected MessageEOFException, got " + e); + } + } + + public void testEOFFloat() throws Exception + { + try + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeFloat(1.3f); + bm.reset(); + bm.readFloat(); + // should throw + bm.readFloat(); + fail("expected exception did not occur"); + } + catch (MessageEOFException m) + { + // ok + } + catch (Exception e) + { + fail("expected MessageEOFException, got " + e); + } + } + + public void testEOFInt() throws Exception + { + try + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeInt(99); + bm.reset(); + bm.readInt(); + // should throw + bm.readInt(); + fail("expected exception did not occur"); + } + catch (MessageEOFException m) + { + // ok + } + catch (Exception e) + { + fail("expected MessageEOFException, got " + e); + } + } + + public void testEOFLong() throws Exception + { + try + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeLong(4L); + bm.reset(); + bm.readLong(); + // should throw + bm.readLong(); + fail("expected exception did not occur"); + } + catch (MessageEOFException m) + { + // ok + } + catch (Exception e) + { + fail("expected MessageEOFException, got " + e); + } + } + + public void testEOFShort() throws Exception + { + try + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.writeShort((short)4); + bm.reset(); + bm.readShort(); + // should throw + bm.readShort(); + fail("expected exception did not occur"); + } + catch (MessageEOFException m) + { + // ok + } + catch (Exception e) + { + fail("expected MessageEOFException, got " + e); + } + } + + public void testToBodyStringWithNull() throws Exception + { + JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage(); + bm.reset(); + String result = bm.toBodyString(); + assertNull(result); + } + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(StreamMessageTest.class); + } +} |