diff options
Diffstat (limited to 'java/common/src/main/java/org/apache/qpid/framing/ContentBody.java')
-rw-r--r-- | java/common/src/main/java/org/apache/qpid/framing/ContentBody.java | 59 |
1 files changed, 43 insertions, 16 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java index aedb35f92a..9d39f8aa86 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java @@ -20,10 +20,7 @@ */ package org.apache.qpid.framing; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; - +import org.apache.mina.common.ByteBuffer; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.AMQException; @@ -31,22 +28,27 @@ public class ContentBody implements AMQBody { public static final byte TYPE = 3; - public byte[] _payload; + public ByteBuffer payload; public ContentBody() { } - public ContentBody(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException + public ContentBody(ByteBuffer buffer, long size) throws AMQFrameDecodingException { - _payload = new byte[(int)size]; - buffer.read(_payload); + if (size > 0) + { + payload = buffer.slice(); + payload.limit((int) size); + buffer.skip((int) size); + } + } - public ContentBody(byte[] payload) + public ContentBody(ByteBuffer payload) { - _payload = payload; + this.payload = payload; } public byte getFrameType() @@ -56,12 +58,23 @@ public class ContentBody implements AMQBody public int getSize() { - return _payload == null ? 0 : _payload.length; + return (payload == null ? 0 : payload.limit()); } - public void writePayload(DataOutputStream buffer) throws IOException + public void writePayload(ByteBuffer buffer) { - buffer.write(_payload); + if (payload != null) + { + if(payload.isDirect() || payload.isReadOnly()) + { + ByteBuffer copy = payload.duplicate(); + buffer.put(copy.rewind()); + } + else + { + buffer.put(payload.array(),payload.arrayOffset(),payload.limit()); + } + } } public void handle(final int channelId, final AMQVersionAwareProtocolSession session) @@ -70,18 +83,32 @@ public class ContentBody implements AMQBody session.contentBodyReceived(channelId, this); } - protected void populateFromBuffer(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException + protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException { if (size > 0) { - _payload = new byte[(int)size]; - buffer.read(_payload); + payload = buffer.slice(); + payload.limit((int) size); + buffer.skip((int) size); } } public void reduceBufferToFit() { + if (payload != null && (payload.remaining() < payload.capacity() / 2)) + { + int size = payload.limit(); + ByteBuffer newPayload = ByteBuffer.allocate(size); + + newPayload.put(payload); + newPayload.flip(); + + //reduce reference count on payload + payload.release(); + + payload = newPayload; + } } |