diff options
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java')
-rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java | 19 |
1 files changed, 13 insertions, 6 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java index babfc3d698..d53031e21b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java @@ -24,19 +24,21 @@ import static org.apache.qpid.transport.util.Functions.*; import static org.apache.qpid.configuration.ClientProperties.*; import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.ExecutorThreadModel; import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.IoFilterAdapter; import org.apache.mina.common.IoFilterChain; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoSession; import org.apache.mina.filter.LoggingFilter; import org.apache.mina.filter.ReadThrottleFilterBuilder; import org.apache.mina.filter.SSLFilter; +import org.apache.mina.filter.executor.ExecutorFilter; import org.apache.mina.util.SessionUtil; import org.apache.qpid.protocol.ReceiverFactory; import org.apache.qpid.ssl.SSLContextFactory; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.network.NetworkConnection; -import org.apache.qpid.transport.network.NetworkTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,12 +49,12 @@ public class MinaNetworkHandler extends IoHandlerAdapter { private static final Logger _log = LoggerFactory.getLogger(MinaNetworkHandler.class); - private NetworkTransport _transport = null; + private MinaNetworkTransport _transport = null; private SSLContextFactory _sslFactory = null; private ReceiverFactory _factory = null; private boolean _debug = false; - public MinaNetworkHandler(NetworkTransport transport, SSLContextFactory sslFactory, ReceiverFactory factory) + public MinaNetworkHandler(MinaNetworkTransport transport, SSLContextFactory sslFactory, ReceiverFactory factory) { _transport = transport; _sslFactory = sslFactory; @@ -60,7 +62,7 @@ public class MinaNetworkHandler extends IoHandlerAdapter _debug = Boolean.getBoolean("amqj.protocol.debug"); } - public MinaNetworkHandler(NetworkTransport transport, SSLContextFactory sslFactory) + public MinaNetworkHandler(MinaNetworkTransport transport, SSLContextFactory sslFactory) { this(transport, sslFactory, null); } @@ -83,6 +85,7 @@ public class MinaNetworkHandler extends IoHandlerAdapter public void exceptionCaught(IoSession ssn, Throwable e) { Receiver<java.nio.ByteBuffer> receiver = (Receiver) ssn.getAttachment(); + _log.error("Caught exception in transport layer", e); receiver.exception(e); } @@ -100,6 +103,12 @@ public class MinaNetworkHandler extends IoHandlerAdapter SessionUtil.initialize(session); IoFilterChain chain = session.getFilterChain(); + if (chain.contains(ExecutorThreadModel.class.getName())) + { + chain.remove(ExecutorThreadModel.class.getName()); + } + IoFilterAdapter filter = new ExecutorFilter(_transport.getExecutor()); + chain.addFirst("sessionExecutor", filter); // Add SSL filter if (_sslFactory != null) @@ -158,8 +167,6 @@ public class MinaNetworkHandler extends IoHandlerAdapter { _log.info("Idle MINA session: " + System.identityHashCode(session)); session.close(); - Receiver<java.nio.ByteBuffer> receiver = (Receiver) session.getAttachment(); - receiver.closed(); } } } |