diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java | 103 |
1 files changed, 90 insertions, 13 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index f713554bfb..6ba55b207a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -20,38 +20,66 @@ */ package org.apache.qpid.client.message; -import java.nio.ByteBuffer; +import java.io.IOException; import java.util.Enumeration; import java.util.UUID; import javax.jms.Destination; import javax.jms.JMSException; +import javax.jms.MessageNotReadableException; import javax.jms.MessageNotWriteableException; +import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQSession; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message { + + protected ByteBuffer _data; + protected boolean _readableMessage = false; + protected boolean _changedData = true; + /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */ + + + protected AMQMessageDelegate _delegate; private boolean _redelivered; - private boolean _receivedFromServer; - protected AbstractJMSMessage(AMQMessageDelegateFactory delegateFactory, boolean fromReceivedData) + protected AbstractJMSMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) { _delegate = delegateFactory.createDelegate(); - setContentType(getMimeType()); + _data = data; + if (_data != null) + { + _data.acquire(); + } + + + _readableMessage = (data != null); + _changedData = (data == null); + } - protected AbstractJMSMessage(AMQMessageDelegate delegate, boolean fromReceivedData) throws AMQException + protected AbstractJMSMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException { _delegate = delegate; - setContentType(getMimeType()); + + _data = data; + if (_data != null) + { + _data.acquire(); + } + + _readableMessage = data != null; + } public String getJMSMessageID() throws JMSException @@ -301,9 +329,12 @@ public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message public void clearBody() throws JMSException { - _receivedFromServer = false; + clearBodyImpl(); + _readableMessage = false; + } + public void acknowledgeThis() throws JMSException { _delegate.acknowledgeThis(); @@ -314,7 +345,14 @@ public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message _delegate.acknowledge(); } - /* + /** + * This forces concrete classes to implement clearBody() + * + * @throws JMSException + */ + public abstract void clearBodyImpl() throws JMSException; + + /** * Get a String representation of the body of the message. Used in the toString() method which outputs this before * message properties. */ @@ -375,24 +413,63 @@ public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message return _delegate; } - abstract public ByteBuffer getData() throws JMSException; + public ByteBuffer getData() + { + // make sure we rewind the data just in case any method has moved the + // position beyond the start + if (_data != null) + { + reset(); + } + return _data; + } + + protected void checkReadable() throws MessageNotReadableException + { + if (!_readableMessage) + { + throw new MessageNotReadableException("You need to call reset() to make the message readable"); + } + } protected void checkWritable() throws MessageNotWriteableException { - if (_receivedFromServer) + if (_readableMessage) { throw new MessageNotWriteableException("You need to call clearBody() to make the message writable"); } } - - public void setReceivedFromServer() + public void reset() { - _receivedFromServer = true; + if (!_changedData) + { + _data.rewind(); + } + else + { + _data.flip(); + _changedData = false; + } } + public int getContentLength() + { + if(_data != null) + { + return _data.remaining(); + } + else + { + return 0; + } + } + public void receivedFromServer() + { + _changedData = false; + } /** * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls |