diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java | 217 |
1 files changed, 83 insertions, 134 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java index c981c951c3..637d9dd692 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java @@ -20,28 +20,26 @@ */ package org.apache.qpid.client.message; -import java.io.*; -import java.nio.ByteBuffer; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; import javax.jms.JMSException; import javax.jms.MessageFormatException; import javax.jms.ObjectMessage; +import org.apache.mina.common.ByteBuffer; + import org.apache.qpid.AMQException; -import org.apache.qpid.client.util.ClassLoadingAwareObjectInputStream; public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessage { public static final String MIME_TYPE = "application/java-object-stream"; - private static final int DEFAULT_OUTPUT_BUFFER_SIZE = 256; - - private Serializable _readData; - private ByteBuffer _data; - private Exception _exception; - - private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0); + private static final int DEFAULT_BUFFER_SIZE = 1024; /** * Creates empty, writable message for use by producers @@ -49,57 +47,41 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag */ public JMSObjectMessage(AMQMessageDelegateFactory delegateFactory) { - super(delegateFactory, false); + this(delegateFactory, null); + } + + private JMSObjectMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) + { + super(delegateFactory, data); + if (data == null) + { + _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE); + _data.setAutoExpand(true); + } + + setContentType(getMimeType()); } /** * Creates read only message for delivery to consumers */ - JMSObjectMessage(AMQMessageDelegate delegate, final ByteBuffer data) throws AMQException + JMSObjectMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException { - super(delegate, data!=null); - - try - { - ClassLoadingAwareObjectInputStream in = new ClassLoadingAwareObjectInputStream(new InputStream() - { - - - @Override - public int read() throws IOException - { - return data.get(); - } - - @Override - public int read(byte[] b, int off, int len) throws IOException - { - len = data.remaining() < len ? data.remaining() : len; - data.get(b, off, len); - return len; - } - }); - - _readData = (Serializable) in.readObject(); - } - catch (IOException e) - { - _exception = e; - } - catch (ClassNotFoundException e) - { - _exception = e; - } + super(delegate, data); } - public void clearBody() throws JMSException + public void clearBodyImpl() throws JMSException { - super.clearBody(); - _exception = null; - _readData = null; - _data = null; + if (_data != null) + { + _data.release(); + _data = null; + } + + + } public String toBodyString() throws JMSException @@ -112,116 +94,83 @@ public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessag return MIME_TYPE; } - @Override - public ByteBuffer getData() throws JMSException + public void setObject(Serializable serializable) throws JMSException { - if(_exception != null) - { - final MessageFormatException messageFormatException = - new MessageFormatException("Unable to deserialize message"); - messageFormatException.setLinkedException(_exception); - throw messageFormatException; - } - if(_readData == null) - { + checkWritable(); - return _data == null ? EMPTY_BYTE_BUFFER : _data.duplicate(); + if (_data == null) + { + _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE); + _data.setAutoExpand(true); } else { - try - { - ByteArrayOutputStream baos = new ByteArrayOutputStream(DEFAULT_OUTPUT_BUFFER_SIZE); - ObjectOutputStream oos = new ObjectOutputStream(baos); - oos.writeObject(_readData); - oos.flush(); - return ByteBuffer.wrap(baos.toByteArray()); - } - catch (IOException e) - { - final JMSException jmsException = new JMSException("Unable to encode object of type: " + - _readData.getClass().getName() + ", value " + _readData); - jmsException.setLinkedException(e); - throw jmsException; - } + _data.rewind(); } - } - - public void setObject(Serializable serializable) throws JMSException - { - checkWritable(); - clearBody(); try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(DEFAULT_OUTPUT_BUFFER_SIZE); - ObjectOutputStream oos = new ObjectOutputStream(baos); - oos.writeObject(serializable); - oos.flush(); - _data = ByteBuffer.wrap(baos.toByteArray()); + ObjectOutputStream out = new ObjectOutputStream(_data.asOutputStream()); + out.writeObject(serializable); + out.flush(); + out.close(); } catch (IOException e) { - final JMSException jmsException = new JMSException("Unable to encode object of type: " + - serializable.getClass().getName() + ", value " + serializable); - jmsException.setLinkedException(e); - throw jmsException; + MessageFormatException mfe = new MessageFormatException("Message not serializable: " + e); + mfe.setLinkedException(e); + mfe.initCause(e); + throw mfe; } } public Serializable getObject() throws JMSException { - if(_exception != null) + ObjectInputStream in = null; + if (_data == null) { - final MessageFormatException messageFormatException = new MessageFormatException("Unable to deserialize message"); - messageFormatException.setLinkedException(_exception); - throw messageFormatException; + return null; } - else if(_readData != null || _data == null) + + try { - return _readData; + _data.rewind(); + in = new ObjectInputStream(_data.asInputStream()); + + return (Serializable) in.readObject(); } - else + catch (IOException e) + { + MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e); + mfe.setLinkedException(e); + mfe.initCause(e); + throw mfe; + } + catch (ClassNotFoundException e) + { + MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e); + mfe.setLinkedException(e); + mfe.initCause(e); + throw mfe; + } + finally { - Exception exception = null; + // _data.rewind(); + close(in); + } + } - final ByteBuffer data = _data.duplicate(); - try - { - ClassLoadingAwareObjectInputStream in = new ClassLoadingAwareObjectInputStream(new InputStream() - { - @Override - public int read() throws IOException - { - return data.get(); - } - - @Override - public int read(byte[] b, int off, int len) throws IOException - { - len = data.remaining() < len ? data.remaining() : len; - data.get(b, off, len); - return len; - } - }); - - return (Serializable) in.readObject(); - } - catch (ClassNotFoundException e) - { - exception = e; - } - catch (IOException e) + private static void close(InputStream in) + { + try + { + if (in != null) { - exception = e; + in.close(); } - - JMSException jmsException = new JMSException("Could not deserialize object"); - jmsException.setLinkedException(exception); - throw jmsException; } - + catch (IOException ignore) + { } } - } |