diff options
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java')
-rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java | 69 |
1 files changed, 44 insertions, 25 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java index 2010b2dd93..ac1b959de7 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java @@ -28,23 +28,27 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.ExecutorThreadModel; import org.apache.mina.common.IoAcceptor; +import org.apache.mina.common.IoAcceptorConfig; import org.apache.mina.common.IoConnector; +import org.apache.mina.common.IoFilterAdapter; +import org.apache.mina.common.IoFilterChain; import org.apache.mina.common.IoServiceConfig; import org.apache.mina.common.IoSession; import org.apache.mina.common.PooledByteBufferAllocator; import org.apache.mina.common.SimpleByteBufferAllocator; +import org.apache.mina.common.ThreadModel; +import org.apache.mina.filter.executor.ExecutorFilter; import org.apache.mina.transport.socket.nio.DatagramAcceptor; -import org.apache.mina.transport.socket.nio.DatagramAcceptorConfig; import org.apache.mina.transport.socket.nio.DatagramConnector; import org.apache.mina.transport.socket.nio.DatagramSessionConfig; import org.apache.mina.transport.socket.nio.ExistingSocketConnector; import org.apache.mina.transport.socket.nio.SocketAcceptor; -import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; import org.apache.mina.transport.socket.nio.SocketConnector; import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.apache.mina.transport.vmpipe.VmPipeAcceptor; @@ -71,7 +75,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN public static final List<String> SUPPORTED = Arrays.asList(Transport.SOCKET, Transport.TCP, Transport.UDP, Transport.VM); private int _threads; - private Executor _executor; + private ExecutorService _executor; private ConnectionSettings _settings; private SocketAddress _address; private IoConnector _connector; @@ -93,7 +97,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN org.apache.mina.common.ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); } - int processors = Runtime.getRuntime().availableProcessors(); + int processors = (Runtime.getRuntime().availableProcessors() * 4) + 1; _threads = Integer.parseInt(System.getProperty("amqj.processors", Integer.toString(processors))); _executor = Executors.newCachedThreadPool(Threading.getThreadFactory()); } @@ -130,7 +134,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN if (socket == null) { throw new IllegalArgumentException("Active Socket must be provided for broker " + - "with 'socket://<SocketID>' transport"); + "with 'socket://<SocketID>' transport"); } _address = socket.getRemoteSocketAddress(); _connector = new ExistingSocketConnector(1, _executor); @@ -142,25 +146,26 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN } _log.info("Connecting to broker on: " + _address); - String s = "-"; + String name = "MINANetworkTransport(Client)"; StackTraceElement[] trace = Thread.currentThread().getStackTrace(); for (StackTraceElement elt : trace) { - if (elt.getClassName().contains("Test")) + if (elt.getClassName().endsWith("Test")) { - s += elt.getClassName(); - break; + name += "-" + elt.getClassName(); +// break; // FIXME } } - - IoServiceConfig cfg = _connector.getDefaultConfig(); - cfg.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkTransport(Client)" + s)); - + + IoServiceConfig config = _connector.getDefaultConfig(); + config.setThreadModel(ThreadModel.MANUAL); + // Socket based connection configuration only (TCP/SOCKET) if (_connector instanceof SocketConnector) { - SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); + SocketSessionConfig scfg = (SocketSessionConfig) config.getSessionConfig(); scfg.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay")); + scfg.setKeepAlive(Boolean.getBoolean("amqj.keepAlive")); Integer sendBufferSize = Integer.getInteger("amqj.sendBufferSize", Transport.DEFAULT_BUFFER_SIZE); Integer receiveBufferSize = Integer.getInteger("amqj.receiveBufferSize", Transport.DEFAULT_BUFFER_SIZE); scfg.setSendBufferSize(sendBufferSize); @@ -173,7 +178,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN } // Connect to the broker - ConnectFuture future = _connector.connect(_address, new MinaNetworkHandler(this, sslFactory), cfg); + ConnectFuture future = _connector.connect(_address, new MinaNetworkHandler(this, sslFactory), config); future.join(); if (!future.isConnected()) { @@ -181,6 +186,14 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN } _session = future.getSession(); _session.setAttachment(_receiver); + + IoFilterChain chain = _session.getFilterChain(); + if (chain.contains(ExecutorThreadModel.class.getName())) + { + chain.remove(ExecutorThreadModel.class.getName()); + } + IoFilterAdapter filter = new ExecutorFilter(_executor); + chain.addFirst("clientExecutor", filter); return new MinaNetworkConnection(_session); } @@ -191,9 +204,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN { _acceptor = new SocketAcceptor(_threads, _executor); - SocketAcceptorConfig sconfig = (SocketAcceptorConfig) _acceptor.getDefaultConfig(); - sconfig.setDisconnectOnUnbind(true); - SocketSessionConfig ssc = (SocketSessionConfig) sconfig.getSessionConfig(); + SocketSessionConfig ssc = (SocketSessionConfig) _acceptor.getDefaultConfig().getSessionConfig(); ssc.setReuseAddress(true); ssc.setKeepAlive(Boolean.getBoolean("amqj.keepAlive")); ssc.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay")); @@ -215,9 +226,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN { _acceptor = new DatagramAcceptor(_executor); - DatagramAcceptorConfig dconfig = (DatagramAcceptorConfig) _acceptor.getDefaultConfig(); - dconfig.setDisconnectOnUnbind(true); - DatagramSessionConfig dsc = (DatagramSessionConfig) dconfig.getSessionConfig(); + DatagramSessionConfig dsc = (DatagramSessionConfig) _acceptor.getDefaultConfig().getSessionConfig(); dsc.setReuseAddress(true); Integer sendBufferSize = Integer.getInteger("amqj.sendBufferSize", Transport.DEFAULT_BUFFER_SIZE); Integer receiveBufferSize = Integer.getInteger("amqj.receiveBufferSize", Transport.DEFAULT_BUFFER_SIZE); @@ -235,16 +244,17 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN } else if (settings.getProtocol().equalsIgnoreCase(Transport.VM)) { - _acceptor = new VmPipeAcceptor(); - _address = new VmPipeAddress(settings.getPort()); + _acceptor = new VmPipeAcceptor(); + _address = new VmPipeAddress(settings.getPort()); } else { throw new TransportException("Unknown protocol: " + settings.getProtocol()); } - IoServiceConfig cfg = _acceptor.getDefaultConfig(); - cfg.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkTransport(Broker)")); + IoAcceptorConfig config = (IoAcceptorConfig) _acceptor.getDefaultConfig(); + config.setThreadModel(ThreadModel.MANUAL); + config.setDisconnectOnUnbind(true); try { @@ -255,6 +265,11 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN throw new TransportException("Could not bind to " + _address, e); } } + + public Executor getExecutor() + { + return _executor; + } public SocketAddress getAddress() { @@ -275,6 +290,10 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN { _session.close(); } + if (_executor != null) + { + _executor.shutdownNow(); + } } public boolean isCompatible(String protocol) { |