summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java39
1 files changed, 25 insertions, 14 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
index 82ffc60802..228867b2b0 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
@@ -47,7 +47,7 @@ public class AMQDataBlockDecoder
public AMQDataBlockDecoder()
{ }
- public boolean decodable(IoSession session, ByteBuffer in) throws AMQFrameDecodingException
+ public boolean decodable(java.nio.ByteBuffer in) throws AMQFrameDecodingException
{
final int remainingAfterAttributes = in.remaining() - (1 + 2 + 4 + 1);
// type, channel, body length and end byte
@@ -56,14 +56,15 @@ public class AMQDataBlockDecoder
return false;
}
- in.skip(1 + 2);
- final long bodySize = in.getUnsignedInt();
+ in.position(in.position() + 1 + 2);
+ // Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt()
+ final long bodySize = in.getInt() & 0xffffffffL;
return (remainingAfterAttributes >= bodySize);
}
- protected Object createAndPopulateFrame(IoSession session, ByteBuffer in)
+ public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, ByteBuffer in)
throws AMQFrameDecodingException, AMQProtocolVersionException
{
final byte type = in.get();
@@ -71,15 +72,7 @@ public class AMQDataBlockDecoder
BodyFactory bodyFactory;
if (type == AMQMethodBody.TYPE)
{
- bodyFactory = (BodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY);
- if (bodyFactory == null)
- {
- AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment();
- bodyFactory = new AMQMethodBodyFactory(protocolSession);
- session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory);
-
- }
-
+ bodyFactory = methodBodyFactory;
}
else
{
@@ -115,6 +108,24 @@ public class AMQDataBlockDecoder
public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
{
- out.write(createAndPopulateFrame(session, in));
+ AMQMethodBodyFactory bodyFactory = (AMQMethodBodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY);
+ if (bodyFactory == null)
+ {
+ AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment();
+ bodyFactory = new AMQMethodBodyFactory(protocolSession);
+ session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory);
+ }
+
+ out.write(createAndPopulateFrame(bodyFactory, in));
+ }
+
+ public boolean decodable(ByteBuffer msg) throws AMQFrameDecodingException
+ {
+ return decodable(msg.buf());
+ }
+
+ public AMQDataBlock createAndPopulateFrame(AMQMethodBodyFactory factory, java.nio.ByteBuffer msg) throws AMQProtocolVersionException, AMQFrameDecodingException
+ {
+ return createAndPopulateFrame(factory, ByteBuffer.wrap(msg));
}
}