summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java69
1 files changed, 44 insertions, 25 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java
index 2010b2dd93..ac1b959de7 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java
@@ -28,23 +28,27 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.ExecutorThreadModel;
import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoAcceptorConfig;
import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoFilterAdapter;
+import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.PooledByteBufferAllocator;
import org.apache.mina.common.SimpleByteBufferAllocator;
+import org.apache.mina.common.ThreadModel;
+import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.transport.socket.nio.DatagramAcceptor;
-import org.apache.mina.transport.socket.nio.DatagramAcceptorConfig;
import org.apache.mina.transport.socket.nio.DatagramConnector;
import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
import org.apache.mina.transport.socket.nio.ExistingSocketConnector;
import org.apache.mina.transport.socket.nio.SocketAcceptor;
-import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
import org.apache.mina.transport.socket.nio.SocketConnector;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
@@ -71,7 +75,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
public static final List<String> SUPPORTED = Arrays.asList(Transport.SOCKET, Transport.TCP, Transport.UDP, Transport.VM);
private int _threads;
- private Executor _executor;
+ private ExecutorService _executor;
private ConnectionSettings _settings;
private SocketAddress _address;
private IoConnector _connector;
@@ -93,7 +97,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
org.apache.mina.common.ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
}
- int processors = Runtime.getRuntime().availableProcessors();
+ int processors = (Runtime.getRuntime().availableProcessors() * 4) + 1;
_threads = Integer.parseInt(System.getProperty("amqj.processors", Integer.toString(processors)));
_executor = Executors.newCachedThreadPool(Threading.getThreadFactory());
}
@@ -130,7 +134,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
if (socket == null)
{
throw new IllegalArgumentException("Active Socket must be provided for broker " +
- "with 'socket://<SocketID>' transport");
+ "with 'socket://<SocketID>' transport");
}
_address = socket.getRemoteSocketAddress();
_connector = new ExistingSocketConnector(1, _executor);
@@ -142,25 +146,26 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
}
_log.info("Connecting to broker on: " + _address);
- String s = "-";
+ String name = "MINANetworkTransport(Client)";
StackTraceElement[] trace = Thread.currentThread().getStackTrace();
for (StackTraceElement elt : trace)
{
- if (elt.getClassName().contains("Test"))
+ if (elt.getClassName().endsWith("Test"))
{
- s += elt.getClassName();
- break;
+ name += "-" + elt.getClassName();
+// break; // FIXME
}
}
-
- IoServiceConfig cfg = _connector.getDefaultConfig();
- cfg.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkTransport(Client)" + s));
-
+
+ IoServiceConfig config = _connector.getDefaultConfig();
+ config.setThreadModel(ThreadModel.MANUAL);
+
// Socket based connection configuration only (TCP/SOCKET)
if (_connector instanceof SocketConnector)
{
- SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
+ SocketSessionConfig scfg = (SocketSessionConfig) config.getSessionConfig();
scfg.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay"));
+ scfg.setKeepAlive(Boolean.getBoolean("amqj.keepAlive"));
Integer sendBufferSize = Integer.getInteger("amqj.sendBufferSize", Transport.DEFAULT_BUFFER_SIZE);
Integer receiveBufferSize = Integer.getInteger("amqj.receiveBufferSize", Transport.DEFAULT_BUFFER_SIZE);
scfg.setSendBufferSize(sendBufferSize);
@@ -173,7 +178,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
}
// Connect to the broker
- ConnectFuture future = _connector.connect(_address, new MinaNetworkHandler(this, sslFactory), cfg);
+ ConnectFuture future = _connector.connect(_address, new MinaNetworkHandler(this, sslFactory), config);
future.join();
if (!future.isConnected())
{
@@ -181,6 +186,14 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
}
_session = future.getSession();
_session.setAttachment(_receiver);
+
+ IoFilterChain chain = _session.getFilterChain();
+ if (chain.contains(ExecutorThreadModel.class.getName()))
+ {
+ chain.remove(ExecutorThreadModel.class.getName());
+ }
+ IoFilterAdapter filter = new ExecutorFilter(_executor);
+ chain.addFirst("clientExecutor", filter);
return new MinaNetworkConnection(_session);
}
@@ -191,9 +204,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
{
_acceptor = new SocketAcceptor(_threads, _executor);
- SocketAcceptorConfig sconfig = (SocketAcceptorConfig) _acceptor.getDefaultConfig();
- sconfig.setDisconnectOnUnbind(true);
- SocketSessionConfig ssc = (SocketSessionConfig) sconfig.getSessionConfig();
+ SocketSessionConfig ssc = (SocketSessionConfig) _acceptor.getDefaultConfig().getSessionConfig();
ssc.setReuseAddress(true);
ssc.setKeepAlive(Boolean.getBoolean("amqj.keepAlive"));
ssc.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay"));
@@ -215,9 +226,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
{
_acceptor = new DatagramAcceptor(_executor);
- DatagramAcceptorConfig dconfig = (DatagramAcceptorConfig) _acceptor.getDefaultConfig();
- dconfig.setDisconnectOnUnbind(true);
- DatagramSessionConfig dsc = (DatagramSessionConfig) dconfig.getSessionConfig();
+ DatagramSessionConfig dsc = (DatagramSessionConfig) _acceptor.getDefaultConfig().getSessionConfig();
dsc.setReuseAddress(true);
Integer sendBufferSize = Integer.getInteger("amqj.sendBufferSize", Transport.DEFAULT_BUFFER_SIZE);
Integer receiveBufferSize = Integer.getInteger("amqj.receiveBufferSize", Transport.DEFAULT_BUFFER_SIZE);
@@ -235,16 +244,17 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
}
else if (settings.getProtocol().equalsIgnoreCase(Transport.VM))
{
- _acceptor = new VmPipeAcceptor();
- _address = new VmPipeAddress(settings.getPort());
+ _acceptor = new VmPipeAcceptor();
+ _address = new VmPipeAddress(settings.getPort());
}
else
{
throw new TransportException("Unknown protocol: " + settings.getProtocol());
}
- IoServiceConfig cfg = _acceptor.getDefaultConfig();
- cfg.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkTransport(Broker)"));
+ IoAcceptorConfig config = (IoAcceptorConfig) _acceptor.getDefaultConfig();
+ config.setThreadModel(ThreadModel.MANUAL);
+ config.setDisconnectOnUnbind(true);
try
{
@@ -255,6 +265,11 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
throw new TransportException("Could not bind to " + _address, e);
}
}
+
+ public Executor getExecutor()
+ {
+ return _executor;
+ }
public SocketAddress getAddress()
{
@@ -275,6 +290,10 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
{
_session.close();
}
+ if (_executor != null)
+ {
+ _executor.shutdownNow();
+ }
}
public boolean isCompatible(String protocol) {