diff options
Diffstat (limited to 'M4-RCs/qpid/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java')
-rw-r--r-- | M4-RCs/qpid/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java | 440 |
1 files changed, 0 insertions, 440 deletions
diff --git a/M4-RCs/qpid/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java b/M4-RCs/qpid/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java deleted file mode 100644 index b8c6f29720..0000000000 --- a/M4-RCs/qpid/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java +++ /dev/null @@ -1,440 +0,0 @@ -package org.apache.mina.filter.codec; - - -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ - -import org.apache.mina.common.*; -import org.apache.mina.common.support.DefaultWriteFuture; -import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput; -import org.apache.mina.util.SessionLog; -import org.apache.mina.util.Queue; - - -public class QpidProtocolCodecFilter extends IoFilterAdapter -{ - public static final String ENCODER = QpidProtocolCodecFilter.class.getName() + ".encoder"; - public static final String DECODER = QpidProtocolCodecFilter.class.getName() + ".decoder"; - - private static final Class[] EMPTY_PARAMS = new Class[0]; - private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.wrap( new byte[0] ); - - private final ProtocolCodecFactory factory; - - public QpidProtocolCodecFilter( ProtocolCodecFactory factory ) - { - if( factory == null ) - { - throw new NullPointerException( "factory" ); - } - this.factory = factory; - } - - public QpidProtocolCodecFilter( final ProtocolEncoder encoder, final ProtocolDecoder decoder ) - { - if( encoder == null ) - { - throw new NullPointerException( "encoder" ); - } - if( decoder == null ) - { - throw new NullPointerException( "decoder" ); - } - - this.factory = new ProtocolCodecFactory() - { - public ProtocolEncoder getEncoder() - { - return encoder; - } - - public ProtocolDecoder getDecoder() - { - return decoder; - } - }; - } - - public QpidProtocolCodecFilter( final Class encoderClass, final Class decoderClass ) - { - if( encoderClass == null ) - { - throw new NullPointerException( "encoderClass" ); - } - if( decoderClass == null ) - { - throw new NullPointerException( "decoderClass" ); - } - if( !ProtocolEncoder.class.isAssignableFrom( encoderClass ) ) - { - throw new IllegalArgumentException( "encoderClass: " + encoderClass.getName() ); - } - if( !ProtocolDecoder.class.isAssignableFrom( decoderClass ) ) - { - throw new IllegalArgumentException( "decoderClass: " + decoderClass.getName() ); - } - try - { - encoderClass.getConstructor( EMPTY_PARAMS ); - } - catch( NoSuchMethodException e ) - { - throw new IllegalArgumentException( "encoderClass doesn't have a public default constructor." ); - } - try - { - decoderClass.getConstructor( EMPTY_PARAMS ); - } - catch( NoSuchMethodException e ) - { - throw new IllegalArgumentException( "decoderClass doesn't have a public default constructor." ); - } - - this.factory = new ProtocolCodecFactory() - { - public ProtocolEncoder getEncoder() throws Exception - { - return ( ProtocolEncoder ) encoderClass.newInstance(); - } - - public ProtocolDecoder getDecoder() throws Exception - { - return ( ProtocolDecoder ) decoderClass.newInstance(); - } - }; - } - - public void onPreAdd( IoFilterChain parent, String name, IoFilter.NextFilter nextFilter ) throws Exception - { - if( parent.contains( ProtocolCodecFilter.class ) ) - { - throw new IllegalStateException( "A filter chain cannot contain more than one QpidProtocolCodecFilter." ); - } - } - - public void messageReceived( IoFilter.NextFilter nextFilter, IoSession session, Object message ) throws Exception - { - if( !( message instanceof ByteBuffer ) ) - { - nextFilter.messageReceived( session, message ); - return; - } - - ByteBuffer in = ( ByteBuffer ) message; - ProtocolDecoder decoder = getDecoder( session ); - ProtocolDecoderOutput decoderOut = getDecoderOut( session, nextFilter ); - - try - { - decoder.decode( session, in, decoderOut ); - } - catch( Throwable t ) - { - ProtocolDecoderException pde; - if( t instanceof ProtocolDecoderException ) - { - pde = ( ProtocolDecoderException ) t; - } - else - { - pde = new ProtocolDecoderException( t ); - } - pde.setHexdump( in.getHexDump() ); - throw pde; - } - finally - { - // Dispose the decoder if this session is connectionless. - if( session.getTransportType().isConnectionless() ) - { - disposeDecoder( session ); - } - - // Release the read buffer. - in.release(); - - decoderOut.flush(); - } - } - - public void messageSent( IoFilter.NextFilter nextFilter, IoSession session, Object message ) throws Exception - { - if( message instanceof HiddenByteBuffer ) - { - return; - } - - if( !( message instanceof MessageByteBuffer ) ) - { - nextFilter.messageSent( session, message ); - return; - } - - nextFilter.messageSent( session, ( ( MessageByteBuffer ) message ).message ); - } - - public void filterWrite( IoFilter.NextFilter nextFilter, IoSession session, IoFilter.WriteRequest writeRequest ) throws Exception - { - Object message = writeRequest.getMessage(); - if( message instanceof ByteBuffer ) - { - nextFilter.filterWrite( session, writeRequest ); - return; - } - - ProtocolEncoder encoder = getEncoder( session ); - ProtocolEncoderOutputImpl encoderOut = getEncoderOut( session, nextFilter, writeRequest ); - - try - { - encoder.encode( session, message, encoderOut ); - encoderOut.flush(); - nextFilter.filterWrite( - session, - new IoFilter.WriteRequest( - new MessageByteBuffer( writeRequest.getMessage() ), - writeRequest.getFuture(), writeRequest.getDestination() ) ); - } - catch( Throwable t ) - { - ProtocolEncoderException pee; - if( t instanceof ProtocolEncoderException ) - { - pee = ( ProtocolEncoderException ) t; - } - else - { - pee = new ProtocolEncoderException( t ); - } - throw pee; - } - finally - { - // Dispose the encoder if this session is connectionless. - if( session.getTransportType().isConnectionless() ) - { - disposeEncoder( session ); - } - } - } - - public void sessionClosed( IoFilter.NextFilter nextFilter, IoSession session ) throws Exception - { - // Call finishDecode() first when a connection is closed. - ProtocolDecoder decoder = getDecoder( session ); - ProtocolDecoderOutput decoderOut = getDecoderOut( session, nextFilter ); - try - { - decoder.finishDecode( session, decoderOut ); - } - catch( Throwable t ) - { - ProtocolDecoderException pde; - if( t instanceof ProtocolDecoderException ) - { - pde = ( ProtocolDecoderException ) t; - } - else - { - pde = new ProtocolDecoderException( t ); - } - throw pde; - } - finally - { - // Dispose all. - disposeEncoder( session ); - disposeDecoder( session ); - - decoderOut.flush(); - } - - nextFilter.sessionClosed( session ); - } - - private ProtocolEncoder getEncoder( IoSession session ) throws Exception - { - ProtocolEncoder encoder = ( ProtocolEncoder ) session.getAttribute( ENCODER ); - if( encoder == null ) - { - encoder = factory.getEncoder(); - session.setAttribute( ENCODER, encoder ); - } - return encoder; - } - - private ProtocolEncoderOutputImpl getEncoderOut( IoSession session, IoFilter.NextFilter nextFilter, IoFilter.WriteRequest writeRequest ) - { - return new ProtocolEncoderOutputImpl( session, nextFilter, writeRequest ); - } - - private ProtocolDecoder getDecoder( IoSession session ) throws Exception - { - ProtocolDecoder decoder = ( ProtocolDecoder ) session.getAttribute( DECODER ); - if( decoder == null ) - { - decoder = factory.getDecoder(); - session.setAttribute( DECODER, decoder ); - } - return decoder; - } - - private ProtocolDecoderOutput getDecoderOut( IoSession session, IoFilter.NextFilter nextFilter ) - { - return new SimpleProtocolDecoderOutput( session, nextFilter ); - } - - private void disposeEncoder( IoSession session ) - { - ProtocolEncoder encoder = ( ProtocolEncoder ) session.removeAttribute( ENCODER ); - if( encoder == null ) - { - return; - } - - try - { - encoder.dispose( session ); - } - catch( Throwable t ) - { - SessionLog.warn( - session, - "Failed to dispose: " + encoder.getClass().getName() + - " (" + encoder + ')' ); - } - } - - private void disposeDecoder( IoSession session ) - { - ProtocolDecoder decoder = ( ProtocolDecoder ) session.removeAttribute( DECODER ); - if( decoder == null ) - { - return; - } - - try - { - decoder.dispose( session ); - } - catch( Throwable t ) - { - SessionLog.warn( - session, - "Falied to dispose: " + decoder.getClass().getName() + - " (" + decoder + ')' ); - } - } - - private static class HiddenByteBuffer extends ByteBufferProxy - { - private HiddenByteBuffer( ByteBuffer buf ) - { - super( buf ); - } - } - - private static class MessageByteBuffer extends ByteBufferProxy - { - private final Object message; - - private MessageByteBuffer( Object message ) - { - super( EMPTY_BUFFER ); - this.message = message; - } - - public void acquire() - { - // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message - } - - public void release() - { - // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message - } - } - - private static class ProtocolEncoderOutputImpl implements ProtocolEncoderOutput - { - private ByteBuffer buffer; - - private final IoSession session; - private final IoFilter.NextFilter nextFilter; - private final IoFilter.WriteRequest writeRequest; - - public ProtocolEncoderOutputImpl( IoSession session, IoFilter.NextFilter nextFilter, IoFilter.WriteRequest writeRequest ) - { - this.session = session; - this.nextFilter = nextFilter; - this.writeRequest = writeRequest; - } - - - - public void write( ByteBuffer buf ) - { - if(buffer != null) - { - flush(); - } - buffer = buf; - } - - public void mergeAll() - { - } - - public WriteFuture flush() - { - WriteFuture future = null; - if( buffer == null ) - { - return null; - } - else - { - ByteBuffer buf = buffer; - // Flush only when the buffer has remaining. - if( buf.hasRemaining() ) - { - future = doFlush( buf ); - } - - } - - return future; - } - - - protected WriteFuture doFlush( ByteBuffer buf ) - { - WriteFuture future = new DefaultWriteFuture( session ); - nextFilter.filterWrite( - session, - new IoFilter.WriteRequest( - buf, - future, writeRequest.getDestination() ) ); - return future; - } - } -} - |