summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main/java/org/apache/qpid/framing
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/framing')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java39
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java20
3 files changed, 35 insertions, 26 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
index 82ffc60802..228867b2b0 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
@@ -47,7 +47,7 @@ public class AMQDataBlockDecoder
public AMQDataBlockDecoder()
{ }
- public boolean decodable(IoSession session, ByteBuffer in) throws AMQFrameDecodingException
+ public boolean decodable(java.nio.ByteBuffer in) throws AMQFrameDecodingException
{
final int remainingAfterAttributes = in.remaining() - (1 + 2 + 4 + 1);
// type, channel, body length and end byte
@@ -56,14 +56,15 @@ public class AMQDataBlockDecoder
return false;
}
- in.skip(1 + 2);
- final long bodySize = in.getUnsignedInt();
+ in.position(in.position() + 1 + 2);
+ // Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt()
+ final long bodySize = in.getInt() & 0xffffffffL;
return (remainingAfterAttributes >= bodySize);
}
- protected Object createAndPopulateFrame(IoSession session, ByteBuffer in)
+ public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, ByteBuffer in)
throws AMQFrameDecodingException, AMQProtocolVersionException
{
final byte type = in.get();
@@ -71,15 +72,7 @@ public class AMQDataBlockDecoder
BodyFactory bodyFactory;
if (type == AMQMethodBody.TYPE)
{
- bodyFactory = (BodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY);
- if (bodyFactory == null)
- {
- AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment();
- bodyFactory = new AMQMethodBodyFactory(protocolSession);
- session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory);
-
- }
-
+ bodyFactory = methodBodyFactory;
}
else
{
@@ -115,6 +108,24 @@ public class AMQDataBlockDecoder
public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
{
- out.write(createAndPopulateFrame(session, in));
+ AMQMethodBodyFactory bodyFactory = (AMQMethodBodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY);
+ if (bodyFactory == null)
+ {
+ AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment();
+ bodyFactory = new AMQMethodBodyFactory(protocolSession);
+ session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory);
+ }
+
+ out.write(createAndPopulateFrame(bodyFactory, in));
+ }
+
+ public boolean decodable(ByteBuffer msg) throws AMQFrameDecodingException
+ {
+ return decodable(msg.buf());
+ }
+
+ public AMQDataBlock createAndPopulateFrame(AMQMethodBodyFactory factory, java.nio.ByteBuffer msg) throws AMQProtocolVersionException, AMQFrameDecodingException
+ {
+ return createAndPopulateFrame(factory, ByteBuffer.wrap(msg));
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java
index 05fd2bb480..374644b4f2 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java
@@ -50,7 +50,7 @@ public final class AMQDataBlockEncoder implements MessageEncoder
{
_logger.debug("Encoded frame byte-buffer is '" + EncodingUtils.convertToHexString(buffer) + "'");
}
-
+
out.write(buffer);
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
index 3ac17e9204..cf8a866e47 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
@@ -20,12 +20,10 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.apache.qpid.AMQException;
import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock
{
@@ -53,13 +51,12 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
_protocolMajor = protocolMajor;
_protocolMinor = protocolMinor;
}
-
+
public ProtocolInitiation(ProtocolVersion pv)
{
this(AMQP_HEADER, CURRENT_PROTOCOL_CLASS, TCP_PROTOCOL_INSTANCE, pv.getMajorVersion(), pv.getMinorVersion());
}
-
public ProtocolInitiation(ByteBuffer in)
{
_protocolHeader = new byte[4];
@@ -71,6 +68,11 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
_protocolMinor = in.get();
}
+ public void writePayload(org.apache.mina.common.ByteBuffer buffer)
+ {
+ writePayload(buffer.buf());
+ }
+
public long getSize()
{
return 4 + 1 + 1 + 1 + 1;
@@ -127,16 +129,11 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
* @return true if we have enough data to decode the PI frame fully, false if more
* data is required
*/
- public boolean decodable(IoSession session, ByteBuffer in)
+ public boolean decodable(ByteBuffer in)
{
return (in.remaining() >= 8);
}
- public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out)
- {
- ProtocolInitiation pi = new ProtocolInitiation(in);
- out.write(pi);
- }
}
public ProtocolVersion checkVersion() throws AMQException
@@ -192,4 +189,5 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
buffer.append(Integer.toHexString(_protocolMinor));
return buffer.toString();
}
+
}