diff options
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java')
-rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java | 65 |
1 files changed, 59 insertions, 6 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java index aedb35f92a..541d104dc9 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java @@ -20,9 +20,11 @@ */ package org.apache.qpid.framing; +import java.io.DataInput; import java.io.DataInputStream; -import java.io.DataOutputStream; +import java.io.DataOutput; import java.io.IOException; +import java.nio.ByteBuffer; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.AMQException; @@ -37,10 +39,10 @@ public class ContentBody implements AMQBody { } - public ContentBody(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException + public ContentBody(DataInput buffer, long size) throws AMQFrameDecodingException, IOException { _payload = new byte[(int)size]; - buffer.read(_payload); + buffer.readFully(_payload); } @@ -59,7 +61,7 @@ public class ContentBody implements AMQBody return _payload == null ? 0 : _payload.length; } - public void writePayload(DataOutputStream buffer) throws IOException + public void writePayload(DataOutput buffer) throws IOException { buffer.write(_payload); } @@ -84,11 +86,62 @@ public class ContentBody implements AMQBody { } + private static class BufferContentBody implements AMQBody + { + private final int _length; + private final int _offset; + private final ByteBuffer _buf; + + private BufferContentBody( ByteBuffer buf, int offset, int length) + { + _length = length; + _offset = offset; + _buf = buf; + } + + public byte getFrameType() + { + return TYPE; + } + + + public int getSize() + { + return _length; + } + public void writePayload(DataOutput buffer) throws IOException + { + if(_buf.hasArray()) + { + buffer.write(_buf.array(), _buf.arrayOffset() + _offset, _length); + } + else + { + byte[] data = new byte[_length]; + ByteBuffer buf = _buf.duplicate(); + + buf.position(_offset); + buf.limit(_offset+_length); + buf.get(data); + buffer.write(data); + } + } + + + public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException + { + throw new RuntimeException("Buffered Body only to be used for outgoing data"); + } + } + + public static AMQFrame createAMQFrame(int channelId, ByteBuffer buf, int offset, int length) + { + return new AMQFrame(channelId, new BufferContentBody(buf, offset, length)); + } public static AMQFrame createAMQFrame(int channelId, ContentBody body) { - final AMQFrame frame = new AMQFrame(channelId, body); - return frame; + return new AMQFrame(channelId, body); } } |