diff options
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java')
-rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java | 59 |
1 files changed, 16 insertions, 43 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java index 9d39f8aa86..aedb35f92a 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java @@ -20,7 +20,10 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.AMQException; @@ -28,27 +31,22 @@ public class ContentBody implements AMQBody { public static final byte TYPE = 3; - public ByteBuffer payload; + public byte[] _payload; public ContentBody() { } - public ContentBody(ByteBuffer buffer, long size) throws AMQFrameDecodingException + public ContentBody(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException { - if (size > 0) - { - payload = buffer.slice(); - payload.limit((int) size); - buffer.skip((int) size); - } - + _payload = new byte[(int)size]; + buffer.read(_payload); } - public ContentBody(ByteBuffer payload) + public ContentBody(byte[] payload) { - this.payload = payload; + _payload = payload; } public byte getFrameType() @@ -58,23 +56,12 @@ public class ContentBody implements AMQBody public int getSize() { - return (payload == null ? 0 : payload.limit()); + return _payload == null ? 0 : _payload.length; } - public void writePayload(ByteBuffer buffer) + public void writePayload(DataOutputStream buffer) throws IOException { - if (payload != null) - { - if(payload.isDirect() || payload.isReadOnly()) - { - ByteBuffer copy = payload.duplicate(); - buffer.put(copy.rewind()); - } - else - { - buffer.put(payload.array(),payload.arrayOffset(),payload.limit()); - } - } + buffer.write(_payload); } public void handle(final int channelId, final AMQVersionAwareProtocolSession session) @@ -83,32 +70,18 @@ public class ContentBody implements AMQBody session.contentBodyReceived(channelId, this); } - protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException + protected void populateFromBuffer(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException { if (size > 0) { - payload = buffer.slice(); - payload.limit((int) size); - buffer.skip((int) size); + _payload = new byte[(int)size]; + buffer.read(_payload); } } public void reduceBufferToFit() { - if (payload != null && (payload.remaining() < payload.capacity() / 2)) - { - int size = payload.limit(); - ByteBuffer newPayload = ByteBuffer.allocate(size); - - newPayload.put(payload); - newPayload.flip(); - - //reduce reference count on payload - payload.release(); - - payload = newPayload; - } } |