summaryrefslogtreecommitdiff
path: root/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java')
-rw-r--r--java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java334
1 files changed, 208 insertions, 126 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
index 69bf73bb49..281c0761d9 100644
--- a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
+++ b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
@@ -20,9 +20,13 @@
*/
package org.apache.qpid.codec;
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.SimpleByteBufferAllocator;
+import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQDataBlockDecoder;
@@ -50,8 +54,11 @@ import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
* @todo If protocol initiation decoder not needed, then don't create it. Probably not a big deal, but it adds to the
* per-session overhead.
*/
-public class AMQDecoder
+public class AMQDecoder extends CumulativeProtocolDecoder
{
+
+ private static final String BUFFER = AMQDecoder.class.getName() + ".Buffer";
+
/** Holds the 'normal' AMQP data decoder. */
private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder();
@@ -60,11 +67,12 @@ public class AMQDecoder
/** Flag to indicate whether this decoder needs to handle protocol initiation. */
private boolean _expectProtocolInitiation;
+ private boolean firstDecode = true;
private AMQMethodBodyFactory _bodyFactory;
- private List<ByteArrayInputStream> _remainingBufs = new ArrayList<ByteArrayInputStream>();
-
+ private ByteBuffer _remainingBuf;
+
/**
* Creates a new AMQP decoder.
*
@@ -76,7 +84,98 @@ public class AMQDecoder
_bodyFactory = new AMQMethodBodyFactory(session);
}
+ /**
+ * Delegates decoding AMQP from the data buffer that Mina has retrieved from the wire, to the data or protocol
+ * intiation decoders.
+ *
+ * @param session The Mina session.
+ * @param in The raw byte buffer.
+ * @param out The Mina object output gatherer to write decoded objects to.
+ *
+ * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
+ *
+ * @throws Exception If the data cannot be decoded for any reason.
+ */
+ protected boolean doDecode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
+ {
+
+ boolean decoded;
+ if (_expectProtocolInitiation
+ || (firstDecode
+ && (in.remaining() > 0)
+ && (in.get(in.position()) == (byte)'A')))
+ {
+ decoded = doDecodePI(session, in, out);
+ }
+ else
+ {
+ decoded = doDecodeDataBlock(session, in, out);
+ }
+ if(firstDecode && decoded)
+ {
+ firstDecode = false;
+ }
+ return decoded;
+ }
+
+ /**
+ * Decodes AMQP data, delegating the decoding to an {@link AMQDataBlockDecoder}.
+ *
+ * @param session The Mina session.
+ * @param in The raw byte buffer.
+ * @param out The Mina object output gatherer to write decoded objects to.
+ *
+ * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
+ *
+ * @throws Exception If the data cannot be decoded for any reason.
+ */
+ protected boolean doDecodeDataBlock(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
+ {
+ int pos = in.position();
+ boolean enoughData = _dataBlockDecoder.decodable(in.buf());
+ in.position(pos);
+ if (!enoughData)
+ {
+ // returning false means it will leave the contents in the buffer and
+ // call us again when more data has been read
+ return false;
+ }
+ else
+ {
+ _dataBlockDecoder.decode(session, in, out);
+
+ return true;
+ }
+ }
+
+ /**
+ * Decodes an AMQP initiation, delegating the decoding to a {@link ProtocolInitiation.Decoder}.
+ *
+ * @param session The Mina session.
+ * @param in The raw byte buffer.
+ * @param out The Mina object output gatherer to write decoded objects to.
+ *
+ * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
+ *
+ * @throws Exception If the data cannot be decoded for any reason.
+ */
+ private boolean doDecodePI(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
+ {
+ boolean enoughData = _piDecoder.decodable(in.buf());
+ if (!enoughData)
+ {
+ // returning false means it will leave the contents in the buffer and
+ // call us again when more data has been read
+ return false;
+ }
+ else
+ {
+ ProtocolInitiation pi = new ProtocolInitiation(in.buf());
+ out.write(pi);
+ return true;
+ }
+ }
/**
* Sets the protocol initation flag, that determines whether decoding is handled by the data decoder of the protocol
@@ -90,169 +189,152 @@ public class AMQDecoder
_expectProtocolInitiation = expectProtocolInitiation;
}
- private class RemainingByteArrayInputStream extends InputStream
- {
- private int _currentListPos;
- private int _markPos;
-
- @Override
- public int read() throws IOException
+ /**
+ * Cumulates content of <tt>in</tt> into internal buffer and forwards
+ * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
+ * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
+ * and the cumulative buffer is compacted after decoding ends.
+ *
+ * @throws IllegalStateException if your <tt>doDecode()</tt> returned
+ * <tt>true</tt> not consuming the cumulative buffer.
+ */
+ public void decode( IoSession session, ByteBuffer in,
+ ProtocolDecoderOutput out ) throws Exception
+ {
+ ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
+ // if we have a session buffer, append data to that otherwise
+ // use the buffer read from the network directly
+ if( buf != null )
{
- ByteArrayInputStream currentStream = _remainingBufs.get(_currentListPos);
- if(currentStream.available() > 0)
- {
- return currentStream.read();
- }
- else if((_currentListPos == _remainingBufs.size())
- || (++_currentListPos == _remainingBufs.size()))
- {
- return -1;
- }
- else
- {
-
- ByteArrayInputStream stream = _remainingBufs.get(_currentListPos);
- stream.mark(0);
- return stream.read();
- }
+ buf.put( in );
+ buf.flip();
}
-
- @Override
- public int read(final byte[] b, final int off, final int len) throws IOException
+ else
{
+ buf = in;
+ }
- if(_currentListPos == _remainingBufs.size())
- {
- return -1;
- }
- else
+ for( ;; )
+ {
+ int oldPos = buf.position();
+ boolean decoded = doDecode( session, buf, out );
+ if( decoded )
{
- ByteArrayInputStream currentStream = _remainingBufs.get(_currentListPos);
- final int available = currentStream.available();
- int read = currentStream.read(b, off, len > available ? available : len);
- if(read < len)
+ if( buf.position() == oldPos )
{
- if(_currentListPos++ != _remainingBufs.size())
- {
- _remainingBufs.get(_currentListPos).mark(0);
- }
- int correctRead = read == -1 ? 0 : read;
- int subRead = read(b, off+correctRead, len-correctRead);
- if(subRead == -1)
- {
- return read;
- }
- else
- {
- return correctRead+subRead;
- }
+ throw new IllegalStateException(
+ "doDecode() can't return true when buffer is not consumed." );
}
- else
+
+ if( !buf.hasRemaining() )
{
- return len;
+ break;
}
}
- }
-
- @Override
- public int available() throws IOException
- {
- int total = 0;
- for(int i = _currentListPos; i < _remainingBufs.size(); i++)
+ else
{
- total += _remainingBufs.get(i).available();
+ break;
}
- return total;
}
- @Override
- public void mark(final int readlimit)
+ // if there is any data left that cannot be decoded, we store
+ // it in a buffer in the session and next time this decoder is
+ // invoked the session buffer gets appended to
+ if ( buf.hasRemaining() )
{
- _markPos = _currentListPos;
- final ByteArrayInputStream stream = _remainingBufs.get(_currentListPos);
- if(stream != null)
- {
- stream.mark(readlimit);
- }
+ storeRemainingInSession( buf, session );
}
+ else
+ {
+ removeSessionBuffer( session );
+ }
+ }
+
+ /**
+ * Releases the cumulative buffer used by the specified <tt>session</tt>.
+ * Please don't forget to call <tt>super.dispose( session )</tt> when
+ * you override this method.
+ */
+ public void dispose( IoSession session ) throws Exception
+ {
+ removeSessionBuffer( session );
+ }
- @Override
- public void reset() throws IOException
+ private void removeSessionBuffer(IoSession session)
+ {
+ ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
+ if( buf != null )
{
- _currentListPos = _markPos;
- final int size = _remainingBufs.size();
- if(_currentListPos < size)
- {
- _remainingBufs.get(_currentListPos).reset();
- }
- for(int i = _currentListPos+1; i<size; i++)
- {
- _remainingBufs.get(i).reset();
- }
+ buf.release();
+ session.removeAttribute( BUFFER );
}
}
+ private static final SimpleByteBufferAllocator SIMPLE_BYTE_BUFFER_ALLOCATOR = new SimpleByteBufferAllocator();
+
+ private void storeRemainingInSession(ByteBuffer buf, IoSession session)
+ {
+ ByteBuffer remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate( buf.remaining(), false );
+ remainingBuf.setAutoExpand( true );
+ remainingBuf.put( buf );
+ session.setAttribute( BUFFER, remainingBuf );
+ }
- public ArrayList<AMQDataBlock> decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
+ public ArrayList<AMQDataBlock> decodeBuffer(java.nio.ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException
{
// get prior remaining data from accumulator
ArrayList<AMQDataBlock> dataBlocks = new ArrayList<AMQDataBlock>();
- DataInputStream msg;
-
-
- ByteArrayInputStream bais = new ByteArrayInputStream(buf.array(),buf.arrayOffset()+buf.position(), buf.remaining());
- if(!_remainingBufs.isEmpty())
+ ByteBuffer msg;
+ // if we have a session buffer, append data to that otherwise
+ // use the buffer read from the network directly
+ if( _remainingBuf != null )
{
- _remainingBufs.add(bais);
- msg = new DataInputStream(new RemainingByteArrayInputStream());
+ _remainingBuf.put(buf);
+ _remainingBuf.flip();
+ msg = _remainingBuf;
}
else
{
- msg = new DataInputStream(bais);
+ msg = ByteBuffer.wrap(buf);
}
-
- boolean enoughData = true;
- while (enoughData)
+
+ if (_expectProtocolInitiation
+ || (firstDecode
+ && (msg.remaining() > 0)
+ && (msg.get(msg.position()) == (byte)'A')))
{
- if(!_expectProtocolInitiation)
+ if (_piDecoder.decodable(msg.buf()))
{
- enoughData = _dataBlockDecoder.decodable(msg);
- if (enoughData)
- {
- dataBlocks.add(_dataBlockDecoder.createAndPopulateFrame(_bodyFactory, msg));
- }
+ dataBlocks.add(new ProtocolInitiation(msg.buf()));
}
- else
+ }
+ else
+ {
+ boolean enoughData = true;
+ while (enoughData)
{
- enoughData = _piDecoder.decodable(msg);
- if (enoughData)
- {
- dataBlocks.add(new ProtocolInitiation(msg));
- }
-
- }
+ int pos = msg.position();
- if(!enoughData)
- {
- if(!_remainingBufs.isEmpty())
+ enoughData = _dataBlockDecoder.decodable(msg);
+ msg.position(pos);
+ if (enoughData)
{
- _remainingBufs.remove(_remainingBufs.size()-1);
- ListIterator<ByteArrayInputStream> iterator = _remainingBufs.listIterator();
- while(iterator.hasNext() && iterator.next().available() == 0)
- {
- iterator.remove();
- }
+ dataBlocks.add(_dataBlockDecoder.createAndPopulateFrame(_bodyFactory, msg));
}
- if(bais.available()!=0)
+ else
{
- byte[] remaining = new byte[bais.available()];
- bais.read(remaining);
- _remainingBufs.add(new ByteArrayInputStream(remaining));
+ _remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate(msg.remaining(), false);
+ _remainingBuf.setAutoExpand(true);
+ _remainingBuf.put(msg);
}
}
}
+ if(firstDecode && dataBlocks.size() > 0)
+ {
+ firstDecode = false;
+ }
return dataBlocks;
}
}