summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java103
1 files changed, 90 insertions, 13 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index f713554bfb..6ba55b207a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -20,38 +20,66 @@
*/
package org.apache.qpid.client.message;
-import java.nio.ByteBuffer;
+import java.io.IOException;
import java.util.Enumeration;
import java.util.UUID;
import javax.jms.Destination;
import javax.jms.JMSException;
+import javax.jms.MessageNotReadableException;
import javax.jms.MessageNotWriteableException;
+import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message
{
+
+ protected ByteBuffer _data;
+ protected boolean _readableMessage = false;
+ protected boolean _changedData = true;
+
/** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
+
+
+
protected AMQMessageDelegate _delegate;
private boolean _redelivered;
- private boolean _receivedFromServer;
- protected AbstractJMSMessage(AMQMessageDelegateFactory delegateFactory, boolean fromReceivedData)
+ protected AbstractJMSMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
{
_delegate = delegateFactory.createDelegate();
- setContentType(getMimeType());
+ _data = data;
+ if (_data != null)
+ {
+ _data.acquire();
+ }
+
+
+ _readableMessage = (data != null);
+ _changedData = (data == null);
+
}
- protected AbstractJMSMessage(AMQMessageDelegate delegate, boolean fromReceivedData) throws AMQException
+ protected AbstractJMSMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
{
_delegate = delegate;
- setContentType(getMimeType());
+
+ _data = data;
+ if (_data != null)
+ {
+ _data.acquire();
+ }
+
+ _readableMessage = data != null;
+
}
public String getJMSMessageID() throws JMSException
@@ -301,9 +329,12 @@ public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message
public void clearBody() throws JMSException
{
- _receivedFromServer = false;
+ clearBodyImpl();
+ _readableMessage = false;
+
}
+
public void acknowledgeThis() throws JMSException
{
_delegate.acknowledgeThis();
@@ -314,7 +345,14 @@ public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message
_delegate.acknowledge();
}
- /*
+ /**
+ * This forces concrete classes to implement clearBody()
+ *
+ * @throws JMSException
+ */
+ public abstract void clearBodyImpl() throws JMSException;
+
+ /**
* Get a String representation of the body of the message. Used in the toString() method which outputs this before
* message properties.
*/
@@ -375,24 +413,63 @@ public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message
return _delegate;
}
- abstract public ByteBuffer getData() throws JMSException;
+ public ByteBuffer getData()
+ {
+ // make sure we rewind the data just in case any method has moved the
+ // position beyond the start
+ if (_data != null)
+ {
+ reset();
+ }
+ return _data;
+ }
+
+ protected void checkReadable() throws MessageNotReadableException
+ {
+ if (!_readableMessage)
+ {
+ throw new MessageNotReadableException("You need to call reset() to make the message readable");
+ }
+ }
protected void checkWritable() throws MessageNotWriteableException
{
- if (_receivedFromServer)
+ if (_readableMessage)
{
throw new MessageNotWriteableException("You need to call clearBody() to make the message writable");
}
}
-
- public void setReceivedFromServer()
+ public void reset()
{
- _receivedFromServer = true;
+ if (!_changedData)
+ {
+ _data.rewind();
+ }
+ else
+ {
+ _data.flip();
+ _changedData = false;
+ }
}
+ public int getContentLength()
+ {
+ if(_data != null)
+ {
+ return _data.remaining();
+ }
+ else
+ {
+ return 0;
+ }
+ }
+ public void receivedFromServer()
+ {
+ _changedData = false;
+ }
/**
* The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls