diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java | 53 |
1 files changed, 24 insertions, 29 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 99366101d1..be75fc150f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -20,20 +20,22 @@ */ package org.apache.qpid.client.protocol; -import org.apache.mina.filter.ReadThrottleFilterBuilder; -import org.apache.mina.filter.SSLFilter; -import org.apache.mina.filter.WriteBufferLimitFilterBuilder; +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CountDownLatch; + import org.apache.mina.filter.codec.ProtocolCodecException; -import org.apache.mina.filter.codec.ProtocolCodecFilter; -import org.apache.mina.filter.executor.ExecutorFilter; import org.apache.qpid.AMQConnectionClosedException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.SSLConfiguration; -import org.apache.qpid.client.configuration.ClientProperties; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverHandler; import org.apache.qpid.client.failover.FailoverState; @@ -42,32 +44,29 @@ import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateWaiter; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.framing.*; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.ConnectionCloseOkBody; +import org.apache.qpid.framing.HeartbeatBody; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.ProtocolInitiation; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.pool.Event; import org.apache.qpid.pool.Job; -import org.apache.qpid.pool.PoolingFilter; -import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.ssl.SSLContextFactory; import org.apache.qpid.transport.NetworkDriver; import org.apache.qpid.transport.network.io.IoTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.CountDownLatch; - /** * AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the * network by MINA. The primary purpose of AMQProtocolHandler is to translate the generic event model of MINA into the @@ -107,9 +106,6 @@ import java.util.concurrent.CountDownLatch; * * <p/><table id="crc"><caption>CRC Card</caption> * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Create the filter chain to filter this handlers events. - * <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}. - * * <tr><td> Maintain fail-over state. * <tr><td> * </table> @@ -191,9 +187,8 @@ public class AMQProtocolHandler implements ProtocolEngine _protocolSession = new AMQProtocolSession(this, _connection); _stateManager = new AMQStateManager(_protocolSession); _codecFactory = new AMQCodecFactory(false, _protocolSession); - ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance(); - _readJob = new Job(threadModel.getAsynchronousReadFilter(), PoolingFilter.MAX_JOB_EVENTS, true); - _writeJob = new Job(threadModel.getAsynchronousWriteFilter(), PoolingFilter.MAX_JOB_EVENTS, false); + _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true); + _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false); _poolReference.acquireExecutorService(); _failoverHandler = new FailoverHandler(this); } @@ -436,7 +431,7 @@ public class AMQProtocolHandler implements ProtocolEngine _readBytes += msg.remaining(); final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); - Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Event(new Runnable() + Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable() { @Override public void run() @@ -495,7 +490,7 @@ public class AMQProtocolHandler implements ProtocolEngine } } } - })); + }); } catch (Exception e) { |