summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2009-09-01 16:27:52 +0000
committerAidan Skinner <aidan@apache.org>2009-09-01 16:27:52 +0000
commita7be8fc7337b5cc093f593cc1becb9fe7b4dc0fb (patch)
tree1bb4d963df5afb0293fea0fb60c3282bb46fed1c /qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
parentf0051104b5b99601507c578bd0a7b819a76aef55 (diff)
downloadqpid-python-a7be8fc7337b5cc093f593cc1becb9fe7b4dc0fb.tar.gz
QPID-2025: Add a AMQProtocolEngine from the de-MINAfied AMQMinaProtocolSession. Remove various now-unused classes and update references. Add tests for AMQDecoder. Net -1500 lines, +25% performance on transient messaging. Nice.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@810110 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java79
1 files changed, 74 insertions, 5 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
index 7eef73f337..281c0761d9 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
@@ -20,14 +20,21 @@
*/
package org.apache.qpid.codec;
+import java.util.ArrayList;
+
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.SimpleByteBufferAllocator;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQDataBlockDecoder;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQMethodBodyFactory;
+import org.apache.qpid.framing.AMQProtocolVersionException;
import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
/**
* AMQDecoder delegates the decoding of AMQP either to a data block decoder, or in the case of new connections, to a
@@ -62,14 +69,19 @@ public class AMQDecoder extends CumulativeProtocolDecoder
private boolean _expectProtocolInitiation;
private boolean firstDecode = true;
+ private AMQMethodBodyFactory _bodyFactory;
+
+ private ByteBuffer _remainingBuf;
+
/**
* Creates a new AMQP decoder.
*
* @param expectProtocolInitiation <tt>true</tt> if this decoder needs to handle protocol initiation.
*/
- public AMQDecoder(boolean expectProtocolInitiation)
+ public AMQDecoder(boolean expectProtocolInitiation, AMQVersionAwareProtocolSession session)
{
_expectProtocolInitiation = expectProtocolInitiation;
+ _bodyFactory = new AMQMethodBodyFactory(session);
}
/**
@@ -120,7 +132,7 @@ public class AMQDecoder extends CumulativeProtocolDecoder
protected boolean doDecodeDataBlock(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
{
int pos = in.position();
- boolean enoughData = _dataBlockDecoder.decodable(session, in);
+ boolean enoughData = _dataBlockDecoder.decodable(in.buf());
in.position(pos);
if (!enoughData)
{
@@ -149,7 +161,7 @@ public class AMQDecoder extends CumulativeProtocolDecoder
*/
private boolean doDecodePI(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
{
- boolean enoughData = _piDecoder.decodable(session, in);
+ boolean enoughData = _piDecoder.decodable(in.buf());
if (!enoughData)
{
// returning false means it will leave the contents in the buffer and
@@ -158,7 +170,8 @@ public class AMQDecoder extends CumulativeProtocolDecoder
}
else
{
- _piDecoder.decode(session, in, out);
+ ProtocolInitiation pi = new ProtocolInitiation(in.buf());
+ out.write(pi);
return true;
}
@@ -177,7 +190,7 @@ public class AMQDecoder extends CumulativeProtocolDecoder
}
- /**
+ /**
* Cumulates content of <tt>in</tt> into internal buffer and forwards
* decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
* <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
@@ -268,4 +281,60 @@ public class AMQDecoder extends CumulativeProtocolDecoder
session.setAttribute( BUFFER, remainingBuf );
}
+ public ArrayList<AMQDataBlock> decodeBuffer(java.nio.ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException
+ {
+
+ // get prior remaining data from accumulator
+ ArrayList<AMQDataBlock> dataBlocks = new ArrayList<AMQDataBlock>();
+ ByteBuffer msg;
+ // if we have a session buffer, append data to that otherwise
+ // use the buffer read from the network directly
+ if( _remainingBuf != null )
+ {
+ _remainingBuf.put(buf);
+ _remainingBuf.flip();
+ msg = _remainingBuf;
+ }
+ else
+ {
+ msg = ByteBuffer.wrap(buf);
+ }
+
+ if (_expectProtocolInitiation
+ || (firstDecode
+ && (msg.remaining() > 0)
+ && (msg.get(msg.position()) == (byte)'A')))
+ {
+ if (_piDecoder.decodable(msg.buf()))
+ {
+ dataBlocks.add(new ProtocolInitiation(msg.buf()));
+ }
+ }
+ else
+ {
+ boolean enoughData = true;
+ while (enoughData)
+ {
+ int pos = msg.position();
+
+ enoughData = _dataBlockDecoder.decodable(msg);
+ msg.position(pos);
+ if (enoughData)
+ {
+ dataBlocks.add(_dataBlockDecoder.createAndPopulateFrame(_bodyFactory, msg));
+ }
+ else
+ {
+ _remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate(msg.remaining(), false);
+ _remainingBuf.setAutoExpand(true);
+ _remainingBuf.put(msg);
+ }
+ }
+ }
+ if(firstDecode && dataBlocks.size() > 0)
+ {
+ firstDecode = false;
+ }
+ return dataBlocks;
+ }
}