diff options
Diffstat (limited to 'java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java')
-rw-r--r-- | java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java | 97 |
1 files changed, 97 insertions, 0 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java index ff0bc798da..7eef73f337 100644 --- a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java +++ b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java @@ -22,6 +22,7 @@ package org.apache.qpid.codec; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IoSession; +import org.apache.mina.common.SimpleByteBufferAllocator; import org.apache.mina.filter.codec.CumulativeProtocolDecoder; import org.apache.mina.filter.codec.ProtocolDecoderOutput; @@ -48,6 +49,9 @@ import org.apache.qpid.framing.ProtocolInitiation; */ public class AMQDecoder extends CumulativeProtocolDecoder { + + private static final String BUFFER = AMQDecoder.class.getName() + ".Buffer"; + /** Holds the 'normal' AMQP data decoder. */ private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder(); @@ -171,4 +175,97 @@ public class AMQDecoder extends CumulativeProtocolDecoder { _expectProtocolInitiation = expectProtocolInitiation; } + + + /** + * Cumulates content of <tt>in</tt> into internal buffer and forwards + * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}. + * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt> + * and the cumulative buffer is compacted after decoding ends. + * + * @throws IllegalStateException if your <tt>doDecode()</tt> returned + * <tt>true</tt> not consuming the cumulative buffer. + */ + public void decode( IoSession session, ByteBuffer in, + ProtocolDecoderOutput out ) throws Exception + { + ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER ); + // if we have a session buffer, append data to that otherwise + // use the buffer read from the network directly + if( buf != null ) + { + buf.put( in ); + buf.flip(); + } + else + { + buf = in; + } + + for( ;; ) + { + int oldPos = buf.position(); + boolean decoded = doDecode( session, buf, out ); + if( decoded ) + { + if( buf.position() == oldPos ) + { + throw new IllegalStateException( + "doDecode() can't return true when buffer is not consumed." ); + } + + if( !buf.hasRemaining() ) + { + break; + } + } + else + { + break; + } + } + + // if there is any data left that cannot be decoded, we store + // it in a buffer in the session and next time this decoder is + // invoked the session buffer gets appended to + if ( buf.hasRemaining() ) + { + storeRemainingInSession( buf, session ); + } + else + { + removeSessionBuffer( session ); + } + } + + /** + * Releases the cumulative buffer used by the specified <tt>session</tt>. + * Please don't forget to call <tt>super.dispose( session )</tt> when + * you override this method. + */ + public void dispose( IoSession session ) throws Exception + { + removeSessionBuffer( session ); + } + + private void removeSessionBuffer(IoSession session) + { + ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER ); + if( buf != null ) + { + buf.release(); + session.removeAttribute( BUFFER ); + } + } + + private static final SimpleByteBufferAllocator SIMPLE_BYTE_BUFFER_ALLOCATOR = new SimpleByteBufferAllocator(); + + private void storeRemainingInSession(ByteBuffer buf, IoSession session) + { + ByteBuffer remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate( buf.remaining(), false ); + remainingBuf.setAutoExpand( true ); + remainingBuf.put( buf ); + session.setAttribute( BUFFER, remainingBuf ); + } + } |