summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java217
1 files changed, 83 insertions, 134 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
index c981c951c3..637d9dd692 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
@@ -20,28 +20,26 @@
*/
package org.apache.qpid.client.message;
-import java.io.*;
-import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
import javax.jms.ObjectMessage;
+import org.apache.mina.common.ByteBuffer;
+
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.util.ClassLoadingAwareObjectInputStream;
public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessage
{
public static final String MIME_TYPE = "application/java-object-stream";
- private static final int DEFAULT_OUTPUT_BUFFER_SIZE = 256;
-
- private Serializable _readData;
- private ByteBuffer _data;
- private Exception _exception;
-
- private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
+ private static final int DEFAULT_BUFFER_SIZE = 1024;
/**
* Creates empty, writable message for use by producers
@@ -49,57 +47,41 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
*/
public JMSObjectMessage(AMQMessageDelegateFactory delegateFactory)
{
- super(delegateFactory, false);
+ this(delegateFactory, null);
+ }
+
+ private JMSObjectMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
+ {
+ super(delegateFactory, data);
+ if (data == null)
+ {
+ _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+ _data.setAutoExpand(true);
+ }
+
+ setContentType(getMimeType());
}
/**
* Creates read only message for delivery to consumers
*/
- JMSObjectMessage(AMQMessageDelegate delegate, final ByteBuffer data) throws AMQException
+ JMSObjectMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
{
- super(delegate, data!=null);
-
- try
- {
- ClassLoadingAwareObjectInputStream in = new ClassLoadingAwareObjectInputStream(new InputStream()
- {
-
-
- @Override
- public int read() throws IOException
- {
- return data.get();
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException
- {
- len = data.remaining() < len ? data.remaining() : len;
- data.get(b, off, len);
- return len;
- }
- });
-
- _readData = (Serializable) in.readObject();
- }
- catch (IOException e)
- {
- _exception = e;
- }
- catch (ClassNotFoundException e)
- {
- _exception = e;
- }
+ super(delegate, data);
}
- public void clearBody() throws JMSException
+ public void clearBodyImpl() throws JMSException
{
- super.clearBody();
- _exception = null;
- _readData = null;
- _data = null;
+ if (_data != null)
+ {
+ _data.release();
+ _data = null;
+ }
+
+
+
}
public String toBodyString() throws JMSException
@@ -112,116 +94,83 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag
return MIME_TYPE;
}
- @Override
- public ByteBuffer getData() throws JMSException
+ public void setObject(Serializable serializable) throws JMSException
{
- if(_exception != null)
- {
- final MessageFormatException messageFormatException =
- new MessageFormatException("Unable to deserialize message");
- messageFormatException.setLinkedException(_exception);
- throw messageFormatException;
- }
- if(_readData == null)
- {
+ checkWritable();
- return _data == null ? EMPTY_BYTE_BUFFER : _data.duplicate();
+ if (_data == null)
+ {
+ _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+ _data.setAutoExpand(true);
}
else
{
- try
- {
- ByteArrayOutputStream baos = new ByteArrayOutputStream(DEFAULT_OUTPUT_BUFFER_SIZE);
- ObjectOutputStream oos = new ObjectOutputStream(baos);
- oos.writeObject(_readData);
- oos.flush();
- return ByteBuffer.wrap(baos.toByteArray());
- }
- catch (IOException e)
- {
- final JMSException jmsException = new JMSException("Unable to encode object of type: " +
- _readData.getClass().getName() + ", value " + _readData);
- jmsException.setLinkedException(e);
- throw jmsException;
- }
+ _data.rewind();
}
- }
-
- public void setObject(Serializable serializable) throws JMSException
- {
- checkWritable();
- clearBody();
try
{
- ByteArrayOutputStream baos = new ByteArrayOutputStream(DEFAULT_OUTPUT_BUFFER_SIZE);
- ObjectOutputStream oos = new ObjectOutputStream(baos);
- oos.writeObject(serializable);
- oos.flush();
- _data = ByteBuffer.wrap(baos.toByteArray());
+ ObjectOutputStream out = new ObjectOutputStream(_data.asOutputStream());
+ out.writeObject(serializable);
+ out.flush();
+ out.close();
}
catch (IOException e)
{
- final JMSException jmsException = new JMSException("Unable to encode object of type: " +
- serializable.getClass().getName() + ", value " + serializable);
- jmsException.setLinkedException(e);
- throw jmsException;
+ MessageFormatException mfe = new MessageFormatException("Message not serializable: " + e);
+ mfe.setLinkedException(e);
+ mfe.initCause(e);
+ throw mfe;
}
}
public Serializable getObject() throws JMSException
{
- if(_exception != null)
+ ObjectInputStream in = null;
+ if (_data == null)
{
- final MessageFormatException messageFormatException = new MessageFormatException("Unable to deserialize message");
- messageFormatException.setLinkedException(_exception);
- throw messageFormatException;
+ return null;
}
- else if(_readData != null || _data == null)
+
+ try
{
- return _readData;
+ _data.rewind();
+ in = new ObjectInputStream(_data.asInputStream());
+
+ return (Serializable) in.readObject();
}
- else
+ catch (IOException e)
+ {
+ MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e);
+ mfe.setLinkedException(e);
+ mfe.initCause(e);
+ throw mfe;
+ }
+ catch (ClassNotFoundException e)
+ {
+ MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e);
+ mfe.setLinkedException(e);
+ mfe.initCause(e);
+ throw mfe;
+ }
+ finally
{
- Exception exception = null;
+ // _data.rewind();
+ close(in);
+ }
+ }
- final ByteBuffer data = _data.duplicate();
- try
- {
- ClassLoadingAwareObjectInputStream in = new ClassLoadingAwareObjectInputStream(new InputStream()
- {
- @Override
- public int read() throws IOException
- {
- return data.get();
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException
- {
- len = data.remaining() < len ? data.remaining() : len;
- data.get(b, off, len);
- return len;
- }
- });
-
- return (Serializable) in.readObject();
- }
- catch (ClassNotFoundException e)
- {
- exception = e;
- }
- catch (IOException e)
+ private static void close(InputStream in)
+ {
+ try
+ {
+ if (in != null)
{
- exception = e;
+ in.close();
}
-
- JMSException jmsException = new JMSException("Could not deserialize object");
- jmsException.setLinkedException(exception);
- throw jmsException;
}
-
+ catch (IOException ignore)
+ { }
}
-
}