diff options
author | Robert Greig <rgreig@apache.org> | 2006-12-06 23:29:08 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2006-12-06 23:29:08 +0000 |
commit | 92988791da3dffc53c29cc73fbdd831dcb04531a (patch) | |
tree | 8d5887a8fc9f176e491282cbb6551ee842413c5e | |
parent | be821e59060035f7041c35353aafbf576e92b55b (diff) | |
download | qpid-python-92988791da3dffc53c29cc73fbdd831dcb04531a.tar.gz |
In progress StreamMessage and refactored BytesMessage
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/new_persistence@483292 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 470 insertions, 122 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java new file mode 100644 index 0000000000..39c8298add --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java @@ -0,0 +1,173 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.message; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.AMQException; + +import javax.jms.JMSException; +import javax.jms.MessageNotReadableException; +import javax.jms.MessageEOFException; +import javax.jms.MessageNotWriteableException; +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.CharacterCodingException; + +/** + * @author Apache Software Foundation + */ +public abstract class AbstractBytesMessage extends AbstractJMSMessage +{ + private boolean _readable = false; + + /** + * The default initial size of the buffer. The buffer expands automatically. + */ + private static final int DEFAULT_BUFFER_INITIAL_SIZE = 1024; + + AbstractBytesMessage() + { + this(null); + } + + /** + * Construct a bytes message with existing data. + * + * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is + * set to auto expand + */ + AbstractBytesMessage(ByteBuffer data) + { + super(data); // this instanties a content header + getJmsContentHeaderProperties().setContentType(getMimeType()); + + if (_data == null) + { + _data = ByteBuffer.allocate(DEFAULT_BUFFER_INITIAL_SIZE); + _data.setAutoExpand(true); + } + _readable = (data != null); + } + + AbstractBytesMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) + throws AMQException + { + // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea + super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, data); + getJmsContentHeaderProperties().setContentType(getMimeType()); + _readable = true; + } + + public void clearBody() throws JMSException + { + _data.clear(); + _readable = false; + } + + public String toBodyString() throws JMSException + { + checkReadable(); + try + { + return getText(); + } + catch (IOException e) + { + throw new JMSException(e.toString()); + } + } + + /** + * We reset the stream before and after reading the data. This means that toString() will always output + * the entire message and also that the caller can then immediately start reading as if toString() had + * never been called. + * + * @return + * @throws IOException + */ + private String getText() throws IOException + { + // this will use the default platform encoding + if (_data == null) + { + return null; + } + int pos = _data.position(); + _data.rewind(); + // one byte left is for the end of frame marker + if (_data.remaining() == 0) + { + // this is really redundant since pos must be zero + _data.position(pos); + return null; + } + else + { + String data = _data.getString(Charset.forName("UTF8").newDecoder()); + _data.position(pos); + return data; + } + } + + protected void checkReadable() throws MessageNotReadableException + { + if (!_readable) + { + throw new MessageNotReadableException("You need to call reset() to make the message readable"); + } + } + + /** + * Check that there is at least a certain number of bytes available to read + * + * @param len the number of bytes + * @throws javax.jms.MessageEOFException if there are less than len bytes available to read + */ + protected void checkAvailable(int len) throws MessageEOFException + { + if (_data.remaining() < len) + { + throw new MessageEOFException("Unable to read " + len + " bytes"); + } + } + + protected void checkWritable() throws MessageNotWriteableException + { + if (_readable) + { + throw new MessageNotWriteableException("You need to call clearBody() to make the message writable"); + } + } + + public void reset() throws JMSException + { + //checkWritable(); + _data.flip(); + _readable = true; + } + + public boolean isReadable() + { + return _readable; + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java index 6921b0a4e6..7da5b3eed9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java @@ -25,24 +25,14 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.AMQException; import org.apache.mina.common.ByteBuffer; -import javax.jms.JMSException; -import javax.jms.MessageNotReadableException; -import javax.jms.MessageNotWriteableException; -import javax.jms.MessageEOFException; +import javax.jms.*; import java.io.*; import java.nio.charset.Charset; import java.nio.charset.CharacterCodingException; -public class JMSBytesMessage extends AbstractJMSMessage implements javax.jms.BytesMessage +public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessage { - private static final String MIME_TYPE = "application/octet-stream"; - - private boolean _readable = false; - - /** - * The default initial size of the buffer. The buffer expands automatically. - */ - private static final int DEFAULT_BUFFER_INITIAL_SIZE = 1024; + private static final String MIME_TYPE = "application/octet-stream"; JMSBytesMessage() { @@ -58,117 +48,25 @@ public class JMSBytesMessage extends AbstractJMSMessage implements javax.jms.Byt JMSBytesMessage(ByteBuffer data) { super(data); // this instanties a content header - getJmsContentHeaderProperties().setContentType(MIME_TYPE); - - if (_data == null) - { - _data = ByteBuffer.allocate(DEFAULT_BUFFER_INITIAL_SIZE); - _data.setAutoExpand(true); - } - _readable = (data != null); } JMSBytesMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) throws AMQException - { - // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea - super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, data); - getJmsContentHeaderProperties().setContentType(MIME_TYPE); - _readable = true; - } - - public void clearBody() throws JMSException - { - _data.clear(); - _readable = false; - } - - public String toBodyString() throws JMSException - { - checkReadable(); - try - { - return getText(); - } - catch (IOException e) - { - throw new JMSException(e.toString()); - } - } - - /** - * We reset the stream before and after reading the data. This means that toString() will always output - * the entire message and also that the caller can then immediately start reading as if toString() had - * never been called. - * - * @return - * @throws IOException - */ - private String getText() throws IOException - { - // this will use the default platform encoding - if (_data == null) - { - return null; - } - int pos = _data.position(); - _data.rewind(); - // one byte left is for the end of frame marker - if (_data.remaining() == 0) - { - // this is really redundant since pos must be zero - _data.position(pos); - return null; - } - else - { - String data = _data.getString(Charset.forName("UTF8").newDecoder()); - _data.position(pos); - return data; - } + { + super(messageNbr, contentHeader, data); } public String getMimeType() { return MIME_TYPE; - } + } public long getBodyLength() throws JMSException { checkReadable(); return _data.limit(); } - - private void checkReadable() throws MessageNotReadableException - { - if (!_readable) - { - throw new MessageNotReadableException("You need to call reset() to make the message readable"); - } - } - - /** - * Check that there is at least a certain number of bytes available to read - * - * @param len the number of bytes - * @throws MessageEOFException if there are less than len bytes available to read - */ - private void checkAvailable(int len) throws MessageEOFException - { - if (_data.remaining() < len) - { - throw new MessageEOFException("Unable to read " + len + " bytes"); - } - } - - private void checkWritable() throws MessageNotWriteableException - { - if (_readable) - { - throw new MessageNotWriteableException("You need to call clearBody() to make the message writable"); - } - } - + public boolean readBoolean() throws JMSException { checkReadable(); @@ -388,17 +286,5 @@ public class JMSBytesMessage extends AbstractJMSMessage implements javax.jms.Byt throw new NullPointerException("Argument must not be null"); } _data.putObject(object); - } - - public void reset() throws JMSException - { - //checkWritable(); - _data.flip(); - _readable = true; - } - - public boolean isReadable() - { - return _readable; - } + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java new file mode 100644 index 0000000000..061b010a03 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java @@ -0,0 +1,289 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.message; + +import javax.jms.StreamMessage; +import javax.jms.JMSException; +import javax.jms.MessageFormatException; +import java.nio.charset.Charset; +import java.nio.charset.CharacterCodingException; + +/** + * @author Apache Software Foundation + */ +public class JMSStreamMessage extends AbstractBytesMessage implements StreamMessage +{ + private static final String MIME_TYPE="jms/stream-message"; + + private static final String[] _typeNames = { + "boolean", + "byte", + "short", + "char", + "int", + "long", + "float", + "double", + "utf string"}; + + private static final byte BOOLEAN_TYPE = (byte) 1; + + private static final byte BYTE_TYPE = (byte) 2; + + private static final byte SHORT_TYPE = (byte) 3; + + private static final byte CHAR_TYPE = (byte) 4; + + private static final byte INT_TYPE = (byte) 5; + + private static final byte LONG_TYPE = (byte) 6; + + private static final byte FLOAT_TYPE = (byte) 7; + + private static final byte DOUBLE_TYPE = (byte) 8; + + private static final byte STRING_TYPE = (byte) 9; + + public String getMimeType() + { + return MIME_TYPE; + } + + public boolean readBoolean() throws JMSException + { + checkReadable(); + checkAvailable(2); + readAndCheckType(BOOLEAN_TYPE); + return _data.get() != 0; + } + + private void readAndCheckType(byte type) throws MessageFormatException + { + if (_data.get() != type) + { + throw new MessageFormatException("Type " + _typeNames[type] + " not found next in stream"); + } + } + + private void writeTypeDiscriminator(byte type) + { + _data.put(type); + } + + public byte readByte() throws JMSException + { + checkReadable(); + checkAvailable(2); + readAndCheckType(BYTE_TYPE); + return _data.get(); + } + + public short readShort() throws JMSException + { + checkReadable(); + checkAvailable(3); + readAndCheckType(SHORT_TYPE); + return _data.getShort(); + } + + /** + * Note that this method reads a unicode character as two bytes from the stream + * + * @return the character read from the stream + * @throws JMSException + */ + public char readChar() throws JMSException + { + checkReadable(); + checkAvailable(3); + readAndCheckType(CHAR_TYPE); + return _data.getChar(); + } + + public int readInt() throws JMSException + { + checkReadable(); + checkAvailable(5); + readAndCheckType(INT_TYPE); + return _data.getInt(); + } + + public long readLong() throws JMSException + { + checkReadable(); + checkAvailable(9); + readAndCheckType(LONG_TYPE); + return _data.getLong(); + } + + public float readFloat() throws JMSException + { + checkReadable(); + checkAvailable(5); + readAndCheckType(FLOAT_TYPE); + return _data.getFloat(); + } + + public double readDouble() throws JMSException + { + checkReadable(); + checkAvailable(9); + readAndCheckType(DOUBLE_TYPE); + return _data.getDouble(); + } + + public String readString() throws JMSException + { + checkReadable(); + // we check only for one byte plus the type byte since theoretically the string could be only a + // single byte when using UTF-8 encoding + checkAvailable(2); + readAndCheckType(STRING_TYPE); + try + { + return _data.getString(Charset.forName("UTF-8").newDecoder()); + } + catch (CharacterCodingException e) + { + JMSException je = new JMSException("Error decoding byte stream as a UTF8 string: " + e); + je.setLinkedException(e); + throw je; + } + } + + public int readBytes(byte[] bytes) throws JMSException + { + if (bytes == null) + { + throw new IllegalArgumentException("byte array must not be null"); + } + checkReadable(); + int count = (_data.remaining() >= bytes.length ? bytes.length : _data.remaining()); + if (count == 0) + { + return -1; + } + else + { + _data.get(bytes, 0, count); + return count; + } + } + + public Object readObject() throws JMSException + { + return null; + } + + public void writeBoolean(boolean b) throws JMSException + { + checkWritable(); + writeTypeDiscriminator(BOOLEAN_TYPE); + _data.put(b ? (byte) 1 : (byte) 0); + } + + public void writeByte(byte b) throws JMSException + { + checkWritable(); + writeTypeDiscriminator(BYTE_TYPE); + _data.put(b); + } + + public void writeShort(short i) throws JMSException + { + checkWritable(); + writeTypeDiscriminator(SHORT_TYPE); + _data.putShort(i); + } + + public void writeChar(char c) throws JMSException + { + checkWritable(); + writeTypeDiscriminator(CHAR_TYPE); + _data.putChar(c); + } + + public void writeInt(int i) throws JMSException + { + checkWritable(); + writeTypeDiscriminator(INT_TYPE); + _data.putInt(i); + } + + public void writeLong(long l) throws JMSException + { + checkWritable(); + writeTypeDiscriminator(LONG_TYPE); + _data.putLong(l); + } + + public void writeFloat(float v) throws JMSException + { + checkWritable(); + writeTypeDiscriminator(FLOAT_TYPE); + _data.putFloat(v); + } + + public void writeDouble(double v) throws JMSException + { + checkWritable(); + writeTypeDiscriminator(DOUBLE_TYPE); + _data.putDouble(v); + } + + public void writeString(String string) throws JMSException + { + checkWritable(); + writeTypeDiscriminator(STRING_TYPE); + try + { + _data.putString(string, Charset.forName("UTF-8").newEncoder()); + } + catch (CharacterCodingException e) + { + JMSException ex = new JMSException("Unable to encode string: " + e); + ex.setLinkedException(e); + throw ex; + } + } + + public void writeBytes(byte[] bytes) throws JMSException + { + checkWritable(); + _data.put(bytes); + } + + public void writeBytes(byte[] bytes, int offset, int length) throws JMSException + { + checkWritable(); + _data.put(bytes, offset, length); + } + + public void writeObject(Object object) throws JMSException + { + checkWritable(); + if (object == null) + { + throw new NullPointerException("Argument must not be null"); + } + _data.putObject(object); + } +} |