diff options
author | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-12-06 16:05:46 +0000 |
---|---|---|
committer | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-12-06 16:05:46 +0000 |
commit | 80ba5c5efdc23e3922b8f8f5152ceeaefa6951b6 (patch) | |
tree | 2eb2141eb77d43701e718b5b5ab1cbd07401015f /qpid/java/common/src/main | |
parent | 1d44d6e7a3369fb7773ba50d02c3baa8955da382 (diff) | |
download | qpid-python-grkvlt-network-20101013.tar.gz |
Attempt one at merge from r1021441:HEADgrkvlt-network-20101013
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/grkvlt-network-20101013@1042697 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src/main')
8 files changed, 159 insertions, 126 deletions
diff --git a/qpid/java/common/src/main/java/common.bnd b/qpid/java/common/src/main/java/common.bnd index 6cd8a52976..ef56ecec9e 100755 --- a/qpid/java/common/src/main/java/common.bnd +++ b/qpid/java/common/src/main/java/common.bnd @@ -1,4 +1,23 @@ -ver: 0.7.0
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+ver: 0.9.0
Bundle-SymbolicName: qpid-common
Bundle-Version: ${ver}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java index 37e731206c..a4db16742a 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java @@ -20,13 +20,11 @@ */ package org.apache.qpid.transport.network; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.nio.ByteBuffer; - -import org.apache.qpid.transport.codec.BBDecoder; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.Method; @@ -35,19 +33,16 @@ import org.apache.qpid.transport.ProtocolEvent; import org.apache.qpid.transport.ProtocolHeader; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.Struct; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.qpid.transport.codec.BBDecoder; /** * Assembler */ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate { - private static final Logger _log = LoggerFactory.getLogger(Assembler.class); - private final Receiver<ProtocolEvent> receiver; - private final Map<Integer,List<Frame>> segments; - private final Method[] incomplete; + private final Map<Integer, List<Frame>> segments; + private final Map<Integer, Method> incomplete; private final ThreadLocal<BBDecoder> decoder = new ThreadLocal<BBDecoder>() { public BBDecoder initialValue() @@ -59,8 +54,9 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate public Assembler(Receiver<ProtocolEvent> receiver) { this.receiver = receiver; - segments = new HashMap<Integer,List<Frame>>(); - incomplete = new Method[64*1024]; + segments = new HashMap<Integer, List<Frame>>(); + incomplete = new HashMap<Integer, Method>(); +// incomplete = new Method[64*1024]; } private int segmentKey(Frame frame) @@ -102,12 +98,12 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate public void exception(Throwable t) { - this.receiver.exception(t); + receiver.exception(t); } public void closed() { - this.receiver.closed(); + receiver.closed(); } public void init(ProtocolHeader header) @@ -188,7 +184,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate command.read(dec); if (command.hasPayload()) { - incomplete[channel] = command; + incomplete.put(channel, command); } else { @@ -196,8 +192,8 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate } break; case HEADER: - command = incomplete[channel]; - List<Struct> structs = new ArrayList(2); + command = incomplete.get(channel); + List<Struct> structs = new ArrayList<Struct>(2); while (dec.hasRemaining()) { structs.add(dec.readStruct32()); @@ -205,14 +201,14 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate command.setHeader(new Header(structs)); if (frame.isLastSegment()) { - incomplete[channel] = null; + incomplete.remove(channel); emit(channel, command); } break; case BODY: - command = incomplete[channel]; + command = incomplete.get(channel); command.setBody(segment); - incomplete[channel] = null; + incomplete.remove(channel); emit(channel, command); break; default: diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java index 87cabeb874..08b3fae528 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java @@ -20,9 +20,15 @@ */ package org.apache.qpid.transport.network; -import static org.apache.qpid.transport.network.Frame.*; - import static java.lang.Math.min; +import static org.apache.qpid.transport.network.Frame.FIRST_FRAME; +import static org.apache.qpid.transport.network.Frame.FIRST_SEG; +import static org.apache.qpid.transport.network.Frame.HEADER_SIZE; +import static org.apache.qpid.transport.network.Frame.LAST_FRAME; +import static org.apache.qpid.transport.network.Frame.LAST_SEG; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.Method; @@ -35,19 +41,14 @@ import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.Struct; import org.apache.qpid.transport.codec.BBEncoder; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; - /** * Disassembler converts protocol events to byte buffers that can be sent on the network. */ public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelegate<Void> { - private final Sender<ByteBuffer> sender; private final int maxPayload; - private final ByteBuffer header; private final Object sendlock = new Object(); private final ThreadLocal<BBEncoder> encoder = new ThreadLocal<BBEncoder>() { @@ -66,8 +67,6 @@ public final class Disassembler implements Sender<ProtocolEvent>, } this.sender = sender; this.maxPayload = maxFrame - HEADER_SIZE; - this.header = ByteBuffer.allocate(HEADER_SIZE); - this.header.order(ByteOrder.BIG_ENDIAN); } @@ -78,39 +77,35 @@ public final class Disassembler implements Sender<ProtocolEvent>, public void flush() { - synchronized (sendlock) - { - sender.flush(); - } + sender.flush(); } public void close() { - synchronized (sendlock) - { - sender.close(); - } + sender.close(); } private void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buf) { synchronized (sendlock) { - header.put(0, flags); - header.put(1, type); - header.putShort(2, (short) (size + HEADER_SIZE)); - header.put(5, track); - header.putShort(6, (short) channel); - - header.rewind(); - - sender.send(header); - sender.flush(); + ByteBuffer data = ByteBuffer.allocate(size + HEADER_SIZE); + data.order(ByteOrder.BIG_ENDIAN); + + data.put(0, flags); + data.put(1, type); + data.putShort(2, (short) (size + HEADER_SIZE)); + data.put(5, track); + data.putShort(6, (short) channel); + data.position(HEADER_SIZE); int limit = buf.limit(); buf.limit(buf.position() + size); - sender.send(buf); + data.put(buf); buf.limit(limit); + + data.rewind(); + sender.send(data); } } @@ -166,14 +161,6 @@ public final class Disassembler implements Sender<ProtocolEvent>, method(method, SegmentType.COMMAND); } - private ByteBuffer copy(ByteBuffer src) - { - ByteBuffer buf = ByteBuffer.allocate(src.remaining()); - buf.put(src); - buf.flip(); - return buf; - } - private void method(Method method, SegmentType type) { BBEncoder enc = encoder.get(); @@ -228,7 +215,6 @@ public final class Disassembler implements Sender<ProtocolEvent>, { fragment(LAST_SEG, SegmentType.BODY, method, body); } - } } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java index bb7f059d15..c17527c19c 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.transport.network; -import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import org.apache.qpid.transport.TransportException; @@ -34,7 +34,7 @@ public class Transport public static final String UDP = "udp"; public static final String VM = "vm"; public static final String SOCKET = "socket"; - public static final String MULTICAST = "multicast"; + public static final String MULTICAST = "multicast"; // TODO public static final int DEFAULT_BUFFER_SIZE = 32 * 1024; public static final long DEFAULT_TIMEOUT = 60000; @@ -43,20 +43,35 @@ public class Transport public static final String MINA_TRANSPORT = "org.apache.qpid.transport.network.mina.MinaNetworkTransport"; public static final String IO_TRANSPORT = "org.apache.qpid.transport.network.io.IoNetworkTransport"; - public static final String NIO_TRANSPORT = "org.apache.qpid.transport.network.nio.NioNetworkTransport"; - public static final String NETTY_TRANSPORT = "org.apache.qpid.transport.network.netty.NettyNetworkTransport"; + public static final String NIO_TRANSPORT = "org.apache.qpid.transport.network.nio.NioNetworkTransport"; // TODO + public static final String NETTY_TRANSPORT = "org.apache.qpid.transport.network.netty.NettyNetworkTransport"; // TODO - private static final List<String> _incoming = new ArrayList<String>(); - private static final List<String> _outgoing = new ArrayList<String>(); + private static final List<String> _incoming = new LinkedList<String>(); + private static final List<String> _outgoing = new LinkedList<String>(); public static void registerIncomingTransport(Class<? extends IncomingNetworkTransport> transport) { - _incoming.add(transport.getName()); + registerTransport(_incoming, transport.getName()); + } + + public static void registerIncomingTransport(String transport) + { + registerTransport(_incoming, transport); } public static void registerOutgoingTransport(Class<? extends OutgoingNetworkTransport> transport) { - _outgoing.add(transport.getName()); + registerTransport(_outgoing, transport.getName()); + } + + public static void registerOutgoingTransport(String transport) + { + registerTransport(_outgoing, transport); + } + + private static void registerTransport(List<String> registered, String transport) + { + registered.add(transport); } public static IncomingNetworkTransport getIncomingTransport() throws TransportException @@ -71,7 +86,7 @@ public class Transport public static OutgoingNetworkTransport getOutgoingTransport(String protocol) throws TransportException { - return (OutgoingNetworkTransport) getTransport("outgoing", _outgoing, MINA_TRANSPORT, protocol); + return (OutgoingNetworkTransport) getTransport("outgoing", _outgoing, IO_TRANSPORT, protocol); } private static NetworkTransport getTransport(String direction, List<String> registered, String defaultTransport, String protocol) @@ -95,7 +110,7 @@ public class Transport try { - String transport = System.getProperty("qpid.transport." + direction, MINA_TRANSPORT); + String transport = System.getProperty("qpid.transport." + direction, defaultTransport); Class<?> clazz = Class.forName(transport); NetworkTransport network = (NetworkTransport) clazz.newInstance(); if (protocol == null || network.isCompatible(protocol)) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java index aa480554ea..0aee08adbe 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java @@ -66,8 +66,8 @@ public class IoNetworkTransport implements OutgoingNetworkTransport { _socket = new Socket(); - _log.debug("default-SO_RCVBUF : %s", _socket.getReceiveBufferSize()); - _log.debug("default-SO_SNDBUF : %s", _socket.getSendBufferSize()); + _log.debug("default SO_RCVBUF " + _socket.getReceiveBufferSize()); + _log.debug("default SO_SNDBUF " + _socket.getSendBufferSize()); _socket.setTcpNoDelay(noDelay); _socket.setKeepAlive(keepAlive); @@ -75,8 +75,8 @@ public class IoNetworkTransport implements OutgoingNetworkTransport _socket.setReceiveBufferSize(receiveBufferSize); _socket.setReuseAddress(true); - _log.debug("new-SO_RCVBUF : %s", _socket.getReceiveBufferSize()); - _log.debug("new-SO_SNDBUF : %s", _socket.getSendBufferSize()); + _log.debug("new SO_RCVBUF " + _socket.getReceiveBufferSize()); + _log.debug("new SO_SNDBUF " + _socket.getSendBufferSize()); InetAddress address = InetAddress.getByName(settings.getHost()); _socket.connect(new InetSocketAddress(address, settings.getPort())); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java index babfc3d698..d53031e21b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java @@ -24,19 +24,21 @@ import static org.apache.qpid.transport.util.Functions.*; import static org.apache.qpid.configuration.ClientProperties.*; import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.ExecutorThreadModel; import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.IoFilterAdapter; import org.apache.mina.common.IoFilterChain; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoSession; import org.apache.mina.filter.LoggingFilter; import org.apache.mina.filter.ReadThrottleFilterBuilder; import org.apache.mina.filter.SSLFilter; +import org.apache.mina.filter.executor.ExecutorFilter; import org.apache.mina.util.SessionUtil; import org.apache.qpid.protocol.ReceiverFactory; import org.apache.qpid.ssl.SSLContextFactory; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.network.NetworkConnection; -import org.apache.qpid.transport.network.NetworkTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,12 +49,12 @@ public class MinaNetworkHandler extends IoHandlerAdapter { private static final Logger _log = LoggerFactory.getLogger(MinaNetworkHandler.class); - private NetworkTransport _transport = null; + private MinaNetworkTransport _transport = null; private SSLContextFactory _sslFactory = null; private ReceiverFactory _factory = null; private boolean _debug = false; - public MinaNetworkHandler(NetworkTransport transport, SSLContextFactory sslFactory, ReceiverFactory factory) + public MinaNetworkHandler(MinaNetworkTransport transport, SSLContextFactory sslFactory, ReceiverFactory factory) { _transport = transport; _sslFactory = sslFactory; @@ -60,7 +62,7 @@ public class MinaNetworkHandler extends IoHandlerAdapter _debug = Boolean.getBoolean("amqj.protocol.debug"); } - public MinaNetworkHandler(NetworkTransport transport, SSLContextFactory sslFactory) + public MinaNetworkHandler(MinaNetworkTransport transport, SSLContextFactory sslFactory) { this(transport, sslFactory, null); } @@ -83,6 +85,7 @@ public class MinaNetworkHandler extends IoHandlerAdapter public void exceptionCaught(IoSession ssn, Throwable e) { Receiver<java.nio.ByteBuffer> receiver = (Receiver) ssn.getAttachment(); + _log.error("Caught exception in transport layer", e); receiver.exception(e); } @@ -100,6 +103,12 @@ public class MinaNetworkHandler extends IoHandlerAdapter SessionUtil.initialize(session); IoFilterChain chain = session.getFilterChain(); + if (chain.contains(ExecutorThreadModel.class.getName())) + { + chain.remove(ExecutorThreadModel.class.getName()); + } + IoFilterAdapter filter = new ExecutorFilter(_transport.getExecutor()); + chain.addFirst("sessionExecutor", filter); // Add SSL filter if (_sslFactory != null) @@ -158,8 +167,6 @@ public class MinaNetworkHandler extends IoHandlerAdapter { _log.info("Idle MINA session: " + System.identityHashCode(session)); session.close(); - Receiver<java.nio.ByteBuffer> receiver = (Receiver) session.getAttachment(); - receiver.closed(); } } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java index 2010b2dd93..ac1b959de7 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java @@ -28,23 +28,27 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.ExecutorThreadModel; import org.apache.mina.common.IoAcceptor; +import org.apache.mina.common.IoAcceptorConfig; import org.apache.mina.common.IoConnector; +import org.apache.mina.common.IoFilterAdapter; +import org.apache.mina.common.IoFilterChain; import org.apache.mina.common.IoServiceConfig; import org.apache.mina.common.IoSession; import org.apache.mina.common.PooledByteBufferAllocator; import org.apache.mina.common.SimpleByteBufferAllocator; +import org.apache.mina.common.ThreadModel; +import org.apache.mina.filter.executor.ExecutorFilter; import org.apache.mina.transport.socket.nio.DatagramAcceptor; -import org.apache.mina.transport.socket.nio.DatagramAcceptorConfig; import org.apache.mina.transport.socket.nio.DatagramConnector; import org.apache.mina.transport.socket.nio.DatagramSessionConfig; import org.apache.mina.transport.socket.nio.ExistingSocketConnector; import org.apache.mina.transport.socket.nio.SocketAcceptor; -import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; import org.apache.mina.transport.socket.nio.SocketConnector; import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.apache.mina.transport.vmpipe.VmPipeAcceptor; @@ -71,7 +75,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN public static final List<String> SUPPORTED = Arrays.asList(Transport.SOCKET, Transport.TCP, Transport.UDP, Transport.VM); private int _threads; - private Executor _executor; + private ExecutorService _executor; private ConnectionSettings _settings; private SocketAddress _address; private IoConnector _connector; @@ -93,7 +97,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN org.apache.mina.common.ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); } - int processors = Runtime.getRuntime().availableProcessors(); + int processors = (Runtime.getRuntime().availableProcessors() * 4) + 1; _threads = Integer.parseInt(System.getProperty("amqj.processors", Integer.toString(processors))); _executor = Executors.newCachedThreadPool(Threading.getThreadFactory()); } @@ -130,7 +134,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN if (socket == null) { throw new IllegalArgumentException("Active Socket must be provided for broker " + - "with 'socket://<SocketID>' transport"); + "with 'socket://<SocketID>' transport"); } _address = socket.getRemoteSocketAddress(); _connector = new ExistingSocketConnector(1, _executor); @@ -142,25 +146,26 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN } _log.info("Connecting to broker on: " + _address); - String s = "-"; + String name = "MINANetworkTransport(Client)"; StackTraceElement[] trace = Thread.currentThread().getStackTrace(); for (StackTraceElement elt : trace) { - if (elt.getClassName().contains("Test")) + if (elt.getClassName().endsWith("Test")) { - s += elt.getClassName(); - break; + name += "-" + elt.getClassName(); +// break; // FIXME } } - - IoServiceConfig cfg = _connector.getDefaultConfig(); - cfg.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkTransport(Client)" + s)); - + + IoServiceConfig config = _connector.getDefaultConfig(); + config.setThreadModel(ThreadModel.MANUAL); + // Socket based connection configuration only (TCP/SOCKET) if (_connector instanceof SocketConnector) { - SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); + SocketSessionConfig scfg = (SocketSessionConfig) config.getSessionConfig(); scfg.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay")); + scfg.setKeepAlive(Boolean.getBoolean("amqj.keepAlive")); Integer sendBufferSize = Integer.getInteger("amqj.sendBufferSize", Transport.DEFAULT_BUFFER_SIZE); Integer receiveBufferSize = Integer.getInteger("amqj.receiveBufferSize", Transport.DEFAULT_BUFFER_SIZE); scfg.setSendBufferSize(sendBufferSize); @@ -173,7 +178,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN } // Connect to the broker - ConnectFuture future = _connector.connect(_address, new MinaNetworkHandler(this, sslFactory), cfg); + ConnectFuture future = _connector.connect(_address, new MinaNetworkHandler(this, sslFactory), config); future.join(); if (!future.isConnected()) { @@ -181,6 +186,14 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN } _session = future.getSession(); _session.setAttachment(_receiver); + + IoFilterChain chain = _session.getFilterChain(); + if (chain.contains(ExecutorThreadModel.class.getName())) + { + chain.remove(ExecutorThreadModel.class.getName()); + } + IoFilterAdapter filter = new ExecutorFilter(_executor); + chain.addFirst("clientExecutor", filter); return new MinaNetworkConnection(_session); } @@ -191,9 +204,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN { _acceptor = new SocketAcceptor(_threads, _executor); - SocketAcceptorConfig sconfig = (SocketAcceptorConfig) _acceptor.getDefaultConfig(); - sconfig.setDisconnectOnUnbind(true); - SocketSessionConfig ssc = (SocketSessionConfig) sconfig.getSessionConfig(); + SocketSessionConfig ssc = (SocketSessionConfig) _acceptor.getDefaultConfig().getSessionConfig(); ssc.setReuseAddress(true); ssc.setKeepAlive(Boolean.getBoolean("amqj.keepAlive")); ssc.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay")); @@ -215,9 +226,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN { _acceptor = new DatagramAcceptor(_executor); - DatagramAcceptorConfig dconfig = (DatagramAcceptorConfig) _acceptor.getDefaultConfig(); - dconfig.setDisconnectOnUnbind(true); - DatagramSessionConfig dsc = (DatagramSessionConfig) dconfig.getSessionConfig(); + DatagramSessionConfig dsc = (DatagramSessionConfig) _acceptor.getDefaultConfig().getSessionConfig(); dsc.setReuseAddress(true); Integer sendBufferSize = Integer.getInteger("amqj.sendBufferSize", Transport.DEFAULT_BUFFER_SIZE); Integer receiveBufferSize = Integer.getInteger("amqj.receiveBufferSize", Transport.DEFAULT_BUFFER_SIZE); @@ -235,16 +244,17 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN } else if (settings.getProtocol().equalsIgnoreCase(Transport.VM)) { - _acceptor = new VmPipeAcceptor(); - _address = new VmPipeAddress(settings.getPort()); + _acceptor = new VmPipeAcceptor(); + _address = new VmPipeAddress(settings.getPort()); } else { throw new TransportException("Unknown protocol: " + settings.getProtocol()); } - IoServiceConfig cfg = _acceptor.getDefaultConfig(); - cfg.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkTransport(Broker)")); + IoAcceptorConfig config = (IoAcceptorConfig) _acceptor.getDefaultConfig(); + config.setThreadModel(ThreadModel.MANUAL); + config.setDisconnectOnUnbind(true); try { @@ -255,6 +265,11 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN throw new TransportException("Could not bind to " + _address, e); } } + + public Executor getExecutor() + { + return _executor; + } public SocketAddress getAddress() { @@ -275,6 +290,10 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN { _session.close(); } + if (_executor != null) + { + _executor.shutdownNow(); + } } public boolean isCompatible(String protocol) { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java index 5fc3032d35..10d70ed34f 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java @@ -26,20 +26,16 @@ import org.apache.mina.common.IoSession; import org.apache.mina.common.WriteFuture; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import org.apache.qpid.transport.network.Transport; /** * MinaSender */ public class MinaSender implements Sender<java.nio.ByteBuffer> { - private static final Logger _log = LoggerFactory.getLogger(MinaSender.class); - private final IoSession _session; - private WriteFuture _lastWrite; - private int _idleTimeout = 0; + private int _idle = 0; + private WriteFuture _written; public MinaSender(IoSession session) { @@ -52,41 +48,36 @@ public class MinaSender implements Sender<java.nio.ByteBuffer> { throw new TransportException("attempted to write to a closed socket"); } - ByteBuffer mina = ByteBuffer.allocate(buf.capacity()); - mina.put(buf); - mina.flip(); - flush(); - _lastWrite = _session.write(mina); + _written = _session.write(ByteBuffer.wrap(buf)); } public synchronized void flush() { - if (_lastWrite != null) + if (_written != null) { - _lastWrite.join(); - if (!_lastWrite.isWritten()) - { - throw new RuntimeException("Error flushing buffer"); - } + _written.join(Transport.DEFAULT_TIMEOUT); + if (!_written.isWritten()) + { + throw new TransportException("Error flushing data buffer"); + } } } - public void close() + public synchronized void close() { - // MINA will sometimes throw away in-progress writes when you ask it to close flush(); CloseFuture closed = _session.close(); closed.join(); } - public void setIdleTimeout(int i) + public void setIdleTimeout(int idle) { - _idleTimeout = i; - _session.setWriteTimeout(_idleTimeout); + _idle = idle; + _session.setWriteTimeout(_idle); } public long getIdleTimeout() { - return _idleTimeout; + return _idle; } } |