summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java19
1 files changed, 13 insertions, 6 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java
index babfc3d698..d53031e21b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java
@@ -24,19 +24,21 @@ import static org.apache.qpid.transport.util.Functions.*;
import static org.apache.qpid.configuration.ClientProperties.*;
import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ExecutorThreadModel;
import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoFilterAdapter;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
import org.apache.mina.filter.LoggingFilter;
import org.apache.mina.filter.ReadThrottleFilterBuilder;
import org.apache.mina.filter.SSLFilter;
+import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.util.SessionUtil;
import org.apache.qpid.protocol.ReceiverFactory;
import org.apache.qpid.ssl.SSLContextFactory;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.NetworkTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,12 +49,12 @@ public class MinaNetworkHandler extends IoHandlerAdapter
{
private static final Logger _log = LoggerFactory.getLogger(MinaNetworkHandler.class);
- private NetworkTransport _transport = null;
+ private MinaNetworkTransport _transport = null;
private SSLContextFactory _sslFactory = null;
private ReceiverFactory _factory = null;
private boolean _debug = false;
- public MinaNetworkHandler(NetworkTransport transport, SSLContextFactory sslFactory, ReceiverFactory factory)
+ public MinaNetworkHandler(MinaNetworkTransport transport, SSLContextFactory sslFactory, ReceiverFactory factory)
{
_transport = transport;
_sslFactory = sslFactory;
@@ -60,7 +62,7 @@ public class MinaNetworkHandler extends IoHandlerAdapter
_debug = Boolean.getBoolean("amqj.protocol.debug");
}
- public MinaNetworkHandler(NetworkTransport transport, SSLContextFactory sslFactory)
+ public MinaNetworkHandler(MinaNetworkTransport transport, SSLContextFactory sslFactory)
{
this(transport, sslFactory, null);
}
@@ -83,6 +85,7 @@ public class MinaNetworkHandler extends IoHandlerAdapter
public void exceptionCaught(IoSession ssn, Throwable e)
{
Receiver<java.nio.ByteBuffer> receiver = (Receiver) ssn.getAttachment();
+ _log.error("Caught exception in transport layer", e);
receiver.exception(e);
}
@@ -100,6 +103,12 @@ public class MinaNetworkHandler extends IoHandlerAdapter
SessionUtil.initialize(session);
IoFilterChain chain = session.getFilterChain();
+ if (chain.contains(ExecutorThreadModel.class.getName()))
+ {
+ chain.remove(ExecutorThreadModel.class.getName());
+ }
+ IoFilterAdapter filter = new ExecutorFilter(_transport.getExecutor());
+ chain.addFirst("sessionExecutor", filter);
// Add SSL filter
if (_sslFactory != null)
@@ -158,8 +167,6 @@ public class MinaNetworkHandler extends IoHandlerAdapter
{
_log.info("Idle MINA session: " + System.identityHashCode(session));
session.close();
- Receiver<java.nio.ByteBuffer> receiver = (Receiver) session.getAttachment();
- receiver.closed();
}
}
}