summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main/java/org/apache/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java34
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java60
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java35
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java8
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java19
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java69
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java39
7 files changed, 139 insertions, 125 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
index 37e731206c..a4db16742a 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
@@ -20,13 +20,11 @@
*/
package org.apache.qpid.transport.network;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.nio.ByteBuffer;
-
-import org.apache.qpid.transport.codec.BBDecoder;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.Method;
@@ -35,19 +33,16 @@ import org.apache.qpid.transport.ProtocolEvent;
import org.apache.qpid.transport.ProtocolHeader;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.Struct;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.qpid.transport.codec.BBDecoder;
/**
* Assembler
*/
public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
{
- private static final Logger _log = LoggerFactory.getLogger(Assembler.class);
-
private final Receiver<ProtocolEvent> receiver;
- private final Map<Integer,List<Frame>> segments;
- private final Method[] incomplete;
+ private final Map<Integer, List<Frame>> segments;
+ private final Map<Integer, Method> incomplete;
private final ThreadLocal<BBDecoder> decoder = new ThreadLocal<BBDecoder>()
{
public BBDecoder initialValue()
@@ -59,8 +54,9 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
public Assembler(Receiver<ProtocolEvent> receiver)
{
this.receiver = receiver;
- segments = new HashMap<Integer,List<Frame>>();
- incomplete = new Method[64*1024];
+ segments = new HashMap<Integer, List<Frame>>();
+ incomplete = new HashMap<Integer, Method>();
+// incomplete = new Method[64*1024];
}
private int segmentKey(Frame frame)
@@ -102,12 +98,12 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
public void exception(Throwable t)
{
- this.receiver.exception(t);
+ receiver.exception(t);
}
public void closed()
{
- this.receiver.closed();
+ receiver.closed();
}
public void init(ProtocolHeader header)
@@ -188,7 +184,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
command.read(dec);
if (command.hasPayload())
{
- incomplete[channel] = command;
+ incomplete.put(channel, command);
}
else
{
@@ -196,8 +192,8 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
}
break;
case HEADER:
- command = incomplete[channel];
- List<Struct> structs = new ArrayList(2);
+ command = incomplete.get(channel);
+ List<Struct> structs = new ArrayList<Struct>(2);
while (dec.hasRemaining())
{
structs.add(dec.readStruct32());
@@ -205,14 +201,14 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
command.setHeader(new Header(structs));
if (frame.isLastSegment())
{
- incomplete[channel] = null;
+ incomplete.remove(channel);
emit(channel, command);
}
break;
case BODY:
- command = incomplete[channel];
+ command = incomplete.get(channel);
command.setBody(segment);
- incomplete[channel] = null;
+ incomplete.remove(channel);
emit(channel, command);
break;
default:
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
index 87cabeb874..08b3fae528 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
@@ -20,9 +20,15 @@
*/
package org.apache.qpid.transport.network;
-import static org.apache.qpid.transport.network.Frame.*;
-
import static java.lang.Math.min;
+import static org.apache.qpid.transport.network.Frame.FIRST_FRAME;
+import static org.apache.qpid.transport.network.Frame.FIRST_SEG;
+import static org.apache.qpid.transport.network.Frame.HEADER_SIZE;
+import static org.apache.qpid.transport.network.Frame.LAST_FRAME;
+import static org.apache.qpid.transport.network.Frame.LAST_SEG;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.Method;
@@ -35,19 +41,14 @@ import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.Struct;
import org.apache.qpid.transport.codec.BBEncoder;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
/**
* Disassembler converts protocol events to byte buffers that can be sent on the network.
*/
public final class Disassembler implements Sender<ProtocolEvent>,
ProtocolDelegate<Void>
{
-
private final Sender<ByteBuffer> sender;
private final int maxPayload;
- private final ByteBuffer header;
private final Object sendlock = new Object();
private final ThreadLocal<BBEncoder> encoder = new ThreadLocal<BBEncoder>()
{
@@ -66,8 +67,6 @@ public final class Disassembler implements Sender<ProtocolEvent>,
}
this.sender = sender;
this.maxPayload = maxFrame - HEADER_SIZE;
- this.header = ByteBuffer.allocate(HEADER_SIZE);
- this.header.order(ByteOrder.BIG_ENDIAN);
}
@@ -78,39 +77,35 @@ public final class Disassembler implements Sender<ProtocolEvent>,
public void flush()
{
- synchronized (sendlock)
- {
- sender.flush();
- }
+ sender.flush();
}
public void close()
{
- synchronized (sendlock)
- {
- sender.close();
- }
+ sender.close();
}
private void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buf)
{
synchronized (sendlock)
{
- header.put(0, flags);
- header.put(1, type);
- header.putShort(2, (short) (size + HEADER_SIZE));
- header.put(5, track);
- header.putShort(6, (short) channel);
-
- header.rewind();
-
- sender.send(header);
- sender.flush();
+ ByteBuffer data = ByteBuffer.allocate(size + HEADER_SIZE);
+ data.order(ByteOrder.BIG_ENDIAN);
+
+ data.put(0, flags);
+ data.put(1, type);
+ data.putShort(2, (short) (size + HEADER_SIZE));
+ data.put(5, track);
+ data.putShort(6, (short) channel);
+ data.position(HEADER_SIZE);
int limit = buf.limit();
buf.limit(buf.position() + size);
- sender.send(buf);
+ data.put(buf);
buf.limit(limit);
+
+ data.rewind();
+ sender.send(data);
}
}
@@ -166,14 +161,6 @@ public final class Disassembler implements Sender<ProtocolEvent>,
method(method, SegmentType.COMMAND);
}
- private ByteBuffer copy(ByteBuffer src)
- {
- ByteBuffer buf = ByteBuffer.allocate(src.remaining());
- buf.put(src);
- buf.flip();
- return buf;
- }
-
private void method(Method method, SegmentType type)
{
BBEncoder enc = encoder.get();
@@ -228,7 +215,6 @@ public final class Disassembler implements Sender<ProtocolEvent>,
{
fragment(LAST_SEG, SegmentType.BODY, method, body);
}
-
}
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
index bb7f059d15..c17527c19c 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.transport.network;
-import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
import org.apache.qpid.transport.TransportException;
@@ -34,7 +34,7 @@ public class Transport
public static final String UDP = "udp";
public static final String VM = "vm";
public static final String SOCKET = "socket";
- public static final String MULTICAST = "multicast";
+ public static final String MULTICAST = "multicast"; // TODO
public static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
public static final long DEFAULT_TIMEOUT = 60000;
@@ -43,20 +43,35 @@ public class Transport
public static final String MINA_TRANSPORT = "org.apache.qpid.transport.network.mina.MinaNetworkTransport";
public static final String IO_TRANSPORT = "org.apache.qpid.transport.network.io.IoNetworkTransport";
- public static final String NIO_TRANSPORT = "org.apache.qpid.transport.network.nio.NioNetworkTransport";
- public static final String NETTY_TRANSPORT = "org.apache.qpid.transport.network.netty.NettyNetworkTransport";
+ public static final String NIO_TRANSPORT = "org.apache.qpid.transport.network.nio.NioNetworkTransport"; // TODO
+ public static final String NETTY_TRANSPORT = "org.apache.qpid.transport.network.netty.NettyNetworkTransport"; // TODO
- private static final List<String> _incoming = new ArrayList<String>();
- private static final List<String> _outgoing = new ArrayList<String>();
+ private static final List<String> _incoming = new LinkedList<String>();
+ private static final List<String> _outgoing = new LinkedList<String>();
public static void registerIncomingTransport(Class<? extends IncomingNetworkTransport> transport)
{
- _incoming.add(transport.getName());
+ registerTransport(_incoming, transport.getName());
+ }
+
+ public static void registerIncomingTransport(String transport)
+ {
+ registerTransport(_incoming, transport);
}
public static void registerOutgoingTransport(Class<? extends OutgoingNetworkTransport> transport)
{
- _outgoing.add(transport.getName());
+ registerTransport(_outgoing, transport.getName());
+ }
+
+ public static void registerOutgoingTransport(String transport)
+ {
+ registerTransport(_outgoing, transport);
+ }
+
+ private static void registerTransport(List<String> registered, String transport)
+ {
+ registered.add(transport);
}
public static IncomingNetworkTransport getIncomingTransport() throws TransportException
@@ -71,7 +86,7 @@ public class Transport
public static OutgoingNetworkTransport getOutgoingTransport(String protocol) throws TransportException
{
- return (OutgoingNetworkTransport) getTransport("outgoing", _outgoing, MINA_TRANSPORT, protocol);
+ return (OutgoingNetworkTransport) getTransport("outgoing", _outgoing, IO_TRANSPORT, protocol);
}
private static NetworkTransport getTransport(String direction, List<String> registered, String defaultTransport, String protocol)
@@ -95,7 +110,7 @@ public class Transport
try
{
- String transport = System.getProperty("qpid.transport." + direction, MINA_TRANSPORT);
+ String transport = System.getProperty("qpid.transport." + direction, defaultTransport);
Class<?> clazz = Class.forName(transport);
NetworkTransport network = (NetworkTransport) clazz.newInstance();
if (protocol == null || network.isCompatible(protocol))
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
index aa480554ea..0aee08adbe 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
@@ -66,8 +66,8 @@ public class IoNetworkTransport implements OutgoingNetworkTransport
{
_socket = new Socket();
- _log.debug("default-SO_RCVBUF : %s", _socket.getReceiveBufferSize());
- _log.debug("default-SO_SNDBUF : %s", _socket.getSendBufferSize());
+ _log.debug("default SO_RCVBUF " + _socket.getReceiveBufferSize());
+ _log.debug("default SO_SNDBUF " + _socket.getSendBufferSize());
_socket.setTcpNoDelay(noDelay);
_socket.setKeepAlive(keepAlive);
@@ -75,8 +75,8 @@ public class IoNetworkTransport implements OutgoingNetworkTransport
_socket.setReceiveBufferSize(receiveBufferSize);
_socket.setReuseAddress(true);
- _log.debug("new-SO_RCVBUF : %s", _socket.getReceiveBufferSize());
- _log.debug("new-SO_SNDBUF : %s", _socket.getSendBufferSize());
+ _log.debug("new SO_RCVBUF " + _socket.getReceiveBufferSize());
+ _log.debug("new SO_SNDBUF " + _socket.getSendBufferSize());
InetAddress address = InetAddress.getByName(settings.getHost());
_socket.connect(new InetSocketAddress(address, settings.getPort()));
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java
index babfc3d698..d53031e21b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java
@@ -24,19 +24,21 @@ import static org.apache.qpid.transport.util.Functions.*;
import static org.apache.qpid.configuration.ClientProperties.*;
import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ExecutorThreadModel;
import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoFilterAdapter;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
import org.apache.mina.filter.LoggingFilter;
import org.apache.mina.filter.ReadThrottleFilterBuilder;
import org.apache.mina.filter.SSLFilter;
+import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.util.SessionUtil;
import org.apache.qpid.protocol.ReceiverFactory;
import org.apache.qpid.ssl.SSLContextFactory;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.NetworkTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,12 +49,12 @@ public class MinaNetworkHandler extends IoHandlerAdapter
{
private static final Logger _log = LoggerFactory.getLogger(MinaNetworkHandler.class);
- private NetworkTransport _transport = null;
+ private MinaNetworkTransport _transport = null;
private SSLContextFactory _sslFactory = null;
private ReceiverFactory _factory = null;
private boolean _debug = false;
- public MinaNetworkHandler(NetworkTransport transport, SSLContextFactory sslFactory, ReceiverFactory factory)
+ public MinaNetworkHandler(MinaNetworkTransport transport, SSLContextFactory sslFactory, ReceiverFactory factory)
{
_transport = transport;
_sslFactory = sslFactory;
@@ -60,7 +62,7 @@ public class MinaNetworkHandler extends IoHandlerAdapter
_debug = Boolean.getBoolean("amqj.protocol.debug");
}
- public MinaNetworkHandler(NetworkTransport transport, SSLContextFactory sslFactory)
+ public MinaNetworkHandler(MinaNetworkTransport transport, SSLContextFactory sslFactory)
{
this(transport, sslFactory, null);
}
@@ -83,6 +85,7 @@ public class MinaNetworkHandler extends IoHandlerAdapter
public void exceptionCaught(IoSession ssn, Throwable e)
{
Receiver<java.nio.ByteBuffer> receiver = (Receiver) ssn.getAttachment();
+ _log.error("Caught exception in transport layer", e);
receiver.exception(e);
}
@@ -100,6 +103,12 @@ public class MinaNetworkHandler extends IoHandlerAdapter
SessionUtil.initialize(session);
IoFilterChain chain = session.getFilterChain();
+ if (chain.contains(ExecutorThreadModel.class.getName()))
+ {
+ chain.remove(ExecutorThreadModel.class.getName());
+ }
+ IoFilterAdapter filter = new ExecutorFilter(_transport.getExecutor());
+ chain.addFirst("sessionExecutor", filter);
// Add SSL filter
if (_sslFactory != null)
@@ -158,8 +167,6 @@ public class MinaNetworkHandler extends IoHandlerAdapter
{
_log.info("Idle MINA session: " + System.identityHashCode(session));
session.close();
- Receiver<java.nio.ByteBuffer> receiver = (Receiver) session.getAttachment();
- receiver.closed();
}
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java
index 2010b2dd93..ac1b959de7 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java
@@ -28,23 +28,27 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.ExecutorThreadModel;
import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoAcceptorConfig;
import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoFilterAdapter;
+import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.PooledByteBufferAllocator;
import org.apache.mina.common.SimpleByteBufferAllocator;
+import org.apache.mina.common.ThreadModel;
+import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.transport.socket.nio.DatagramAcceptor;
-import org.apache.mina.transport.socket.nio.DatagramAcceptorConfig;
import org.apache.mina.transport.socket.nio.DatagramConnector;
import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
import org.apache.mina.transport.socket.nio.ExistingSocketConnector;
import org.apache.mina.transport.socket.nio.SocketAcceptor;
-import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
import org.apache.mina.transport.socket.nio.SocketConnector;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
@@ -71,7 +75,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
public static final List<String> SUPPORTED = Arrays.asList(Transport.SOCKET, Transport.TCP, Transport.UDP, Transport.VM);
private int _threads;
- private Executor _executor;
+ private ExecutorService _executor;
private ConnectionSettings _settings;
private SocketAddress _address;
private IoConnector _connector;
@@ -93,7 +97,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
org.apache.mina.common.ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
}
- int processors = Runtime.getRuntime().availableProcessors();
+ int processors = (Runtime.getRuntime().availableProcessors() * 4) + 1;
_threads = Integer.parseInt(System.getProperty("amqj.processors", Integer.toString(processors)));
_executor = Executors.newCachedThreadPool(Threading.getThreadFactory());
}
@@ -130,7 +134,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
if (socket == null)
{
throw new IllegalArgumentException("Active Socket must be provided for broker " +
- "with 'socket://<SocketID>' transport");
+ "with 'socket://<SocketID>' transport");
}
_address = socket.getRemoteSocketAddress();
_connector = new ExistingSocketConnector(1, _executor);
@@ -142,25 +146,26 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
}
_log.info("Connecting to broker on: " + _address);
- String s = "-";
+ String name = "MINANetworkTransport(Client)";
StackTraceElement[] trace = Thread.currentThread().getStackTrace();
for (StackTraceElement elt : trace)
{
- if (elt.getClassName().contains("Test"))
+ if (elt.getClassName().endsWith("Test"))
{
- s += elt.getClassName();
- break;
+ name += "-" + elt.getClassName();
+// break; // FIXME
}
}
-
- IoServiceConfig cfg = _connector.getDefaultConfig();
- cfg.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkTransport(Client)" + s));
-
+
+ IoServiceConfig config = _connector.getDefaultConfig();
+ config.setThreadModel(ThreadModel.MANUAL);
+
// Socket based connection configuration only (TCP/SOCKET)
if (_connector instanceof SocketConnector)
{
- SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
+ SocketSessionConfig scfg = (SocketSessionConfig) config.getSessionConfig();
scfg.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay"));
+ scfg.setKeepAlive(Boolean.getBoolean("amqj.keepAlive"));
Integer sendBufferSize = Integer.getInteger("amqj.sendBufferSize", Transport.DEFAULT_BUFFER_SIZE);
Integer receiveBufferSize = Integer.getInteger("amqj.receiveBufferSize", Transport.DEFAULT_BUFFER_SIZE);
scfg.setSendBufferSize(sendBufferSize);
@@ -173,7 +178,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
}
// Connect to the broker
- ConnectFuture future = _connector.connect(_address, new MinaNetworkHandler(this, sslFactory), cfg);
+ ConnectFuture future = _connector.connect(_address, new MinaNetworkHandler(this, sslFactory), config);
future.join();
if (!future.isConnected())
{
@@ -181,6 +186,14 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
}
_session = future.getSession();
_session.setAttachment(_receiver);
+
+ IoFilterChain chain = _session.getFilterChain();
+ if (chain.contains(ExecutorThreadModel.class.getName()))
+ {
+ chain.remove(ExecutorThreadModel.class.getName());
+ }
+ IoFilterAdapter filter = new ExecutorFilter(_executor);
+ chain.addFirst("clientExecutor", filter);
return new MinaNetworkConnection(_session);
}
@@ -191,9 +204,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
{
_acceptor = new SocketAcceptor(_threads, _executor);
- SocketAcceptorConfig sconfig = (SocketAcceptorConfig) _acceptor.getDefaultConfig();
- sconfig.setDisconnectOnUnbind(true);
- SocketSessionConfig ssc = (SocketSessionConfig) sconfig.getSessionConfig();
+ SocketSessionConfig ssc = (SocketSessionConfig) _acceptor.getDefaultConfig().getSessionConfig();
ssc.setReuseAddress(true);
ssc.setKeepAlive(Boolean.getBoolean("amqj.keepAlive"));
ssc.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay"));
@@ -215,9 +226,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
{
_acceptor = new DatagramAcceptor(_executor);
- DatagramAcceptorConfig dconfig = (DatagramAcceptorConfig) _acceptor.getDefaultConfig();
- dconfig.setDisconnectOnUnbind(true);
- DatagramSessionConfig dsc = (DatagramSessionConfig) dconfig.getSessionConfig();
+ DatagramSessionConfig dsc = (DatagramSessionConfig) _acceptor.getDefaultConfig().getSessionConfig();
dsc.setReuseAddress(true);
Integer sendBufferSize = Integer.getInteger("amqj.sendBufferSize", Transport.DEFAULT_BUFFER_SIZE);
Integer receiveBufferSize = Integer.getInteger("amqj.receiveBufferSize", Transport.DEFAULT_BUFFER_SIZE);
@@ -235,16 +244,17 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
}
else if (settings.getProtocol().equalsIgnoreCase(Transport.VM))
{
- _acceptor = new VmPipeAcceptor();
- _address = new VmPipeAddress(settings.getPort());
+ _acceptor = new VmPipeAcceptor();
+ _address = new VmPipeAddress(settings.getPort());
}
else
{
throw new TransportException("Unknown protocol: " + settings.getProtocol());
}
- IoServiceConfig cfg = _acceptor.getDefaultConfig();
- cfg.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkTransport(Broker)"));
+ IoAcceptorConfig config = (IoAcceptorConfig) _acceptor.getDefaultConfig();
+ config.setThreadModel(ThreadModel.MANUAL);
+ config.setDisconnectOnUnbind(true);
try
{
@@ -255,6 +265,11 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
throw new TransportException("Could not bind to " + _address, e);
}
}
+
+ public Executor getExecutor()
+ {
+ return _executor;
+ }
public SocketAddress getAddress()
{
@@ -275,6 +290,10 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
{
_session.close();
}
+ if (_executor != null)
+ {
+ _executor.shutdownNow();
+ }
}
public boolean isCompatible(String protocol) {
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java
index 5fc3032d35..10d70ed34f 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java
@@ -26,20 +26,16 @@ import org.apache.mina.common.IoSession;
import org.apache.mina.common.WriteFuture;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import org.apache.qpid.transport.network.Transport;
/**
* MinaSender
*/
public class MinaSender implements Sender<java.nio.ByteBuffer>
{
- private static final Logger _log = LoggerFactory.getLogger(MinaSender.class);
-
private final IoSession _session;
- private WriteFuture _lastWrite;
- private int _idleTimeout = 0;
+ private int _idle = 0;
+ private WriteFuture _written;
public MinaSender(IoSession session)
{
@@ -52,41 +48,36 @@ public class MinaSender implements Sender<java.nio.ByteBuffer>
{
throw new TransportException("attempted to write to a closed socket");
}
- ByteBuffer mina = ByteBuffer.allocate(buf.capacity());
- mina.put(buf);
- mina.flip();
- flush();
- _lastWrite = _session.write(mina);
+ _written = _session.write(ByteBuffer.wrap(buf));
}
public synchronized void flush()
{
- if (_lastWrite != null)
+ if (_written != null)
{
- _lastWrite.join();
- if (!_lastWrite.isWritten())
- {
- throw new RuntimeException("Error flushing buffer");
- }
+ _written.join(Transport.DEFAULT_TIMEOUT);
+ if (!_written.isWritten())
+ {
+ throw new TransportException("Error flushing data buffer");
+ }
}
}
- public void close()
+ public synchronized void close()
{
- // MINA will sometimes throw away in-progress writes when you ask it to close
flush();
CloseFuture closed = _session.close();
closed.join();
}
- public void setIdleTimeout(int i)
+ public void setIdleTimeout(int idle)
{
- _idleTimeout = i;
- _session.setWriteTimeout(_idleTimeout);
+ _idle = idle;
+ _session.setWriteTimeout(_idle);
}
public long getIdleTimeout()
{
- return _idleTimeout;
+ return _idle;
}
}