summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java65
1 files changed, 59 insertions, 6 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
index aedb35f92a..541d104dc9 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
@@ -20,9 +20,11 @@
*/
package org.apache.qpid.framing;
+import java.io.DataInput;
import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataOutput;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.AMQException;
@@ -37,10 +39,10 @@ public class ContentBody implements AMQBody
{
}
- public ContentBody(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException
+ public ContentBody(DataInput buffer, long size) throws AMQFrameDecodingException, IOException
{
_payload = new byte[(int)size];
- buffer.read(_payload);
+ buffer.readFully(_payload);
}
@@ -59,7 +61,7 @@ public class ContentBody implements AMQBody
return _payload == null ? 0 : _payload.length;
}
- public void writePayload(DataOutputStream buffer) throws IOException
+ public void writePayload(DataOutput buffer) throws IOException
{
buffer.write(_payload);
}
@@ -84,11 +86,62 @@ public class ContentBody implements AMQBody
{
}
+ private static class BufferContentBody implements AMQBody
+ {
+ private final int _length;
+ private final int _offset;
+ private final ByteBuffer _buf;
+
+ private BufferContentBody( ByteBuffer buf, int offset, int length)
+ {
+ _length = length;
+ _offset = offset;
+ _buf = buf;
+ }
+
+ public byte getFrameType()
+ {
+ return TYPE;
+ }
+
+
+ public int getSize()
+ {
+ return _length;
+ }
+ public void writePayload(DataOutput buffer) throws IOException
+ {
+ if(_buf.hasArray())
+ {
+ buffer.write(_buf.array(), _buf.arrayOffset() + _offset, _length);
+ }
+ else
+ {
+ byte[] data = new byte[_length];
+ ByteBuffer buf = _buf.duplicate();
+
+ buf.position(_offset);
+ buf.limit(_offset+_length);
+ buf.get(data);
+ buffer.write(data);
+ }
+ }
+
+
+ public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
+ {
+ throw new RuntimeException("Buffered Body only to be used for outgoing data");
+ }
+ }
+
+ public static AMQFrame createAMQFrame(int channelId, ByteBuffer buf, int offset, int length)
+ {
+ return new AMQFrame(channelId, new BufferContentBody(buf, offset, length));
+ }
public static AMQFrame createAMQFrame(int channelId, ContentBody body)
{
- final AMQFrame frame = new AMQFrame(channelId, body);
- return frame;
+ return new AMQFrame(channelId, body);
}
}