diff options
Diffstat (limited to 'java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java')
-rw-r--r-- | java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java | 334 |
1 files changed, 208 insertions, 126 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java index 69bf73bb49..281c0761d9 100644 --- a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java +++ b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java @@ -20,9 +20,13 @@ */ package org.apache.qpid.codec; -import java.io.*; -import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.SimpleByteBufferAllocator; +import org.apache.mina.filter.codec.CumulativeProtocolDecoder; +import org.apache.mina.filter.codec.ProtocolDecoderOutput; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQDataBlockDecoder; @@ -50,8 +54,11 @@ import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; * @todo If protocol initiation decoder not needed, then don't create it. Probably not a big deal, but it adds to the * per-session overhead. */ -public class AMQDecoder +public class AMQDecoder extends CumulativeProtocolDecoder { + + private static final String BUFFER = AMQDecoder.class.getName() + ".Buffer"; + /** Holds the 'normal' AMQP data decoder. */ private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder(); @@ -60,11 +67,12 @@ public class AMQDecoder /** Flag to indicate whether this decoder needs to handle protocol initiation. */ private boolean _expectProtocolInitiation; + private boolean firstDecode = true; private AMQMethodBodyFactory _bodyFactory; - private List<ByteArrayInputStream> _remainingBufs = new ArrayList<ByteArrayInputStream>(); - + private ByteBuffer _remainingBuf; + /** * Creates a new AMQP decoder. * @@ -76,7 +84,98 @@ public class AMQDecoder _bodyFactory = new AMQMethodBodyFactory(session); } + /** + * Delegates decoding AMQP from the data buffer that Mina has retrieved from the wire, to the data or protocol + * intiation decoders. + * + * @param session The Mina session. + * @param in The raw byte buffer. + * @param out The Mina object output gatherer to write decoded objects to. + * + * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate. + * + * @throws Exception If the data cannot be decoded for any reason. + */ + protected boolean doDecode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception + { + + boolean decoded; + if (_expectProtocolInitiation + || (firstDecode + && (in.remaining() > 0) + && (in.get(in.position()) == (byte)'A'))) + { + decoded = doDecodePI(session, in, out); + } + else + { + decoded = doDecodeDataBlock(session, in, out); + } + if(firstDecode && decoded) + { + firstDecode = false; + } + return decoded; + } + + /** + * Decodes AMQP data, delegating the decoding to an {@link AMQDataBlockDecoder}. + * + * @param session The Mina session. + * @param in The raw byte buffer. + * @param out The Mina object output gatherer to write decoded objects to. + * + * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate. + * + * @throws Exception If the data cannot be decoded for any reason. + */ + protected boolean doDecodeDataBlock(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception + { + int pos = in.position(); + boolean enoughData = _dataBlockDecoder.decodable(in.buf()); + in.position(pos); + if (!enoughData) + { + // returning false means it will leave the contents in the buffer and + // call us again when more data has been read + return false; + } + else + { + _dataBlockDecoder.decode(session, in, out); + + return true; + } + } + + /** + * Decodes an AMQP initiation, delegating the decoding to a {@link ProtocolInitiation.Decoder}. + * + * @param session The Mina session. + * @param in The raw byte buffer. + * @param out The Mina object output gatherer to write decoded objects to. + * + * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate. + * + * @throws Exception If the data cannot be decoded for any reason. + */ + private boolean doDecodePI(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception + { + boolean enoughData = _piDecoder.decodable(in.buf()); + if (!enoughData) + { + // returning false means it will leave the contents in the buffer and + // call us again when more data has been read + return false; + } + else + { + ProtocolInitiation pi = new ProtocolInitiation(in.buf()); + out.write(pi); + return true; + } + } /** * Sets the protocol initation flag, that determines whether decoding is handled by the data decoder of the protocol @@ -90,169 +189,152 @@ public class AMQDecoder _expectProtocolInitiation = expectProtocolInitiation; } - private class RemainingByteArrayInputStream extends InputStream - { - private int _currentListPos; - private int _markPos; - - @Override - public int read() throws IOException + /** + * Cumulates content of <tt>in</tt> into internal buffer and forwards + * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}. + * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt> + * and the cumulative buffer is compacted after decoding ends. + * + * @throws IllegalStateException if your <tt>doDecode()</tt> returned + * <tt>true</tt> not consuming the cumulative buffer. + */ + public void decode( IoSession session, ByteBuffer in, + ProtocolDecoderOutput out ) throws Exception + { + ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER ); + // if we have a session buffer, append data to that otherwise + // use the buffer read from the network directly + if( buf != null ) { - ByteArrayInputStream currentStream = _remainingBufs.get(_currentListPos); - if(currentStream.available() > 0) - { - return currentStream.read(); - } - else if((_currentListPos == _remainingBufs.size()) - || (++_currentListPos == _remainingBufs.size())) - { - return -1; - } - else - { - - ByteArrayInputStream stream = _remainingBufs.get(_currentListPos); - stream.mark(0); - return stream.read(); - } + buf.put( in ); + buf.flip(); } - - @Override - public int read(final byte[] b, final int off, final int len) throws IOException + else { + buf = in; + } - if(_currentListPos == _remainingBufs.size()) - { - return -1; - } - else + for( ;; ) + { + int oldPos = buf.position(); + boolean decoded = doDecode( session, buf, out ); + if( decoded ) { - ByteArrayInputStream currentStream = _remainingBufs.get(_currentListPos); - final int available = currentStream.available(); - int read = currentStream.read(b, off, len > available ? available : len); - if(read < len) + if( buf.position() == oldPos ) { - if(_currentListPos++ != _remainingBufs.size()) - { - _remainingBufs.get(_currentListPos).mark(0); - } - int correctRead = read == -1 ? 0 : read; - int subRead = read(b, off+correctRead, len-correctRead); - if(subRead == -1) - { - return read; - } - else - { - return correctRead+subRead; - } + throw new IllegalStateException( + "doDecode() can't return true when buffer is not consumed." ); } - else + + if( !buf.hasRemaining() ) { - return len; + break; } } - } - - @Override - public int available() throws IOException - { - int total = 0; - for(int i = _currentListPos; i < _remainingBufs.size(); i++) + else { - total += _remainingBufs.get(i).available(); + break; } - return total; } - @Override - public void mark(final int readlimit) + // if there is any data left that cannot be decoded, we store + // it in a buffer in the session and next time this decoder is + // invoked the session buffer gets appended to + if ( buf.hasRemaining() ) { - _markPos = _currentListPos; - final ByteArrayInputStream stream = _remainingBufs.get(_currentListPos); - if(stream != null) - { - stream.mark(readlimit); - } + storeRemainingInSession( buf, session ); } + else + { + removeSessionBuffer( session ); + } + } + + /** + * Releases the cumulative buffer used by the specified <tt>session</tt>. + * Please don't forget to call <tt>super.dispose( session )</tt> when + * you override this method. + */ + public void dispose( IoSession session ) throws Exception + { + removeSessionBuffer( session ); + } - @Override - public void reset() throws IOException + private void removeSessionBuffer(IoSession session) + { + ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER ); + if( buf != null ) { - _currentListPos = _markPos; - final int size = _remainingBufs.size(); - if(_currentListPos < size) - { - _remainingBufs.get(_currentListPos).reset(); - } - for(int i = _currentListPos+1; i<size; i++) - { - _remainingBufs.get(i).reset(); - } + buf.release(); + session.removeAttribute( BUFFER ); } } + private static final SimpleByteBufferAllocator SIMPLE_BYTE_BUFFER_ALLOCATOR = new SimpleByteBufferAllocator(); + + private void storeRemainingInSession(ByteBuffer buf, IoSession session) + { + ByteBuffer remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate( buf.remaining(), false ); + remainingBuf.setAutoExpand( true ); + remainingBuf.put( buf ); + session.setAttribute( BUFFER, remainingBuf ); + } - public ArrayList<AMQDataBlock> decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException + public ArrayList<AMQDataBlock> decodeBuffer(java.nio.ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException { // get prior remaining data from accumulator ArrayList<AMQDataBlock> dataBlocks = new ArrayList<AMQDataBlock>(); - DataInputStream msg; - - - ByteArrayInputStream bais = new ByteArrayInputStream(buf.array(),buf.arrayOffset()+buf.position(), buf.remaining()); - if(!_remainingBufs.isEmpty()) + ByteBuffer msg; + // if we have a session buffer, append data to that otherwise + // use the buffer read from the network directly + if( _remainingBuf != null ) { - _remainingBufs.add(bais); - msg = new DataInputStream(new RemainingByteArrayInputStream()); + _remainingBuf.put(buf); + _remainingBuf.flip(); + msg = _remainingBuf; } else { - msg = new DataInputStream(bais); + msg = ByteBuffer.wrap(buf); } - - boolean enoughData = true; - while (enoughData) + + if (_expectProtocolInitiation + || (firstDecode + && (msg.remaining() > 0) + && (msg.get(msg.position()) == (byte)'A'))) { - if(!_expectProtocolInitiation) + if (_piDecoder.decodable(msg.buf())) { - enoughData = _dataBlockDecoder.decodable(msg); - if (enoughData) - { - dataBlocks.add(_dataBlockDecoder.createAndPopulateFrame(_bodyFactory, msg)); - } + dataBlocks.add(new ProtocolInitiation(msg.buf())); } - else + } + else + { + boolean enoughData = true; + while (enoughData) { - enoughData = _piDecoder.decodable(msg); - if (enoughData) - { - dataBlocks.add(new ProtocolInitiation(msg)); - } - - } + int pos = msg.position(); - if(!enoughData) - { - if(!_remainingBufs.isEmpty()) + enoughData = _dataBlockDecoder.decodable(msg); + msg.position(pos); + if (enoughData) { - _remainingBufs.remove(_remainingBufs.size()-1); - ListIterator<ByteArrayInputStream> iterator = _remainingBufs.listIterator(); - while(iterator.hasNext() && iterator.next().available() == 0) - { - iterator.remove(); - } + dataBlocks.add(_dataBlockDecoder.createAndPopulateFrame(_bodyFactory, msg)); } - if(bais.available()!=0) + else { - byte[] remaining = new byte[bais.available()]; - bais.read(remaining); - _remainingBufs.add(new ByteArrayInputStream(remaining)); + _remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate(msg.remaining(), false); + _remainingBuf.setAutoExpand(true); + _remainingBuf.put(msg); } } } + if(firstDecode && dataBlocks.size() > 0) + { + firstDecode = false; + } return dataBlocks; } } |