From 80ba5c5efdc23e3922b8f8f5152ceeaefa6951b6 Mon Sep 17 00:00:00 2001 From: Andrew Donald Kennedy Date: Mon, 6 Dec 2010 16:05:46 +0000 Subject: Attempt one at merge from r1021441:HEAD git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/grkvlt-network-20101013@1042697 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/java/common/src/main/java/common.bnd | 21 ++++++- .../apache/qpid/transport/network/Assembler.java | 34 +++++------ .../qpid/transport/network/Disassembler.java | 60 ++++++++----------- .../apache/qpid/transport/network/Transport.java | 35 +++++++---- .../transport/network/io/IoNetworkTransport.java | 8 +-- .../transport/network/mina/MinaNetworkHandler.java | 19 ++++-- .../network/mina/MinaNetworkTransport.java | 69 ++++++++++++++-------- .../qpid/transport/network/mina/MinaSender.java | 39 +++++------- 8 files changed, 159 insertions(+), 126 deletions(-) (limited to 'qpid/java/common/src/main') diff --git a/qpid/java/common/src/main/java/common.bnd b/qpid/java/common/src/main/java/common.bnd index 6cd8a52976..ef56ecec9e 100755 --- a/qpid/java/common/src/main/java/common.bnd +++ b/qpid/java/common/src/main/java/common.bnd @@ -1,4 +1,23 @@ -ver: 0.7.0 +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +ver: 0.9.0 Bundle-SymbolicName: qpid-common Bundle-Version: ${ver} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java index 37e731206c..a4db16742a 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java @@ -20,13 +20,11 @@ */ package org.apache.qpid.transport.network; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.nio.ByteBuffer; - -import org.apache.qpid.transport.codec.BBDecoder; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.Method; @@ -35,19 +33,16 @@ import org.apache.qpid.transport.ProtocolEvent; import org.apache.qpid.transport.ProtocolHeader; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.Struct; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.qpid.transport.codec.BBDecoder; /** * Assembler */ public class Assembler implements Receiver, NetworkDelegate { - private static final Logger _log = LoggerFactory.getLogger(Assembler.class); - private final Receiver receiver; - private final Map> segments; - private final Method[] incomplete; + private final Map> segments; + private final Map incomplete; private final ThreadLocal decoder = new ThreadLocal() { public BBDecoder initialValue() @@ -59,8 +54,9 @@ public class Assembler implements Receiver, NetworkDelegate public Assembler(Receiver receiver) { this.receiver = receiver; - segments = new HashMap>(); - incomplete = new Method[64*1024]; + segments = new HashMap>(); + incomplete = new HashMap(); +// incomplete = new Method[64*1024]; } private int segmentKey(Frame frame) @@ -102,12 +98,12 @@ public class Assembler implements Receiver, NetworkDelegate public void exception(Throwable t) { - this.receiver.exception(t); + receiver.exception(t); } public void closed() { - this.receiver.closed(); + receiver.closed(); } public void init(ProtocolHeader header) @@ -188,7 +184,7 @@ public class Assembler implements Receiver, NetworkDelegate command.read(dec); if (command.hasPayload()) { - incomplete[channel] = command; + incomplete.put(channel, command); } else { @@ -196,8 +192,8 @@ public class Assembler implements Receiver, NetworkDelegate } break; case HEADER: - command = incomplete[channel]; - List structs = new ArrayList(2); + command = incomplete.get(channel); + List structs = new ArrayList(2); while (dec.hasRemaining()) { structs.add(dec.readStruct32()); @@ -205,14 +201,14 @@ public class Assembler implements Receiver, NetworkDelegate command.setHeader(new Header(structs)); if (frame.isLastSegment()) { - incomplete[channel] = null; + incomplete.remove(channel); emit(channel, command); } break; case BODY: - command = incomplete[channel]; + command = incomplete.get(channel); command.setBody(segment); - incomplete[channel] = null; + incomplete.remove(channel); emit(channel, command); break; default: diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java index 87cabeb874..08b3fae528 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java @@ -20,9 +20,15 @@ */ package org.apache.qpid.transport.network; -import static org.apache.qpid.transport.network.Frame.*; - import static java.lang.Math.min; +import static org.apache.qpid.transport.network.Frame.FIRST_FRAME; +import static org.apache.qpid.transport.network.Frame.FIRST_SEG; +import static org.apache.qpid.transport.network.Frame.HEADER_SIZE; +import static org.apache.qpid.transport.network.Frame.LAST_FRAME; +import static org.apache.qpid.transport.network.Frame.LAST_SEG; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.Method; @@ -35,19 +41,14 @@ import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.Struct; import org.apache.qpid.transport.codec.BBEncoder; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; - /** * Disassembler converts protocol events to byte buffers that can be sent on the network. */ public final class Disassembler implements Sender, ProtocolDelegate { - private final Sender sender; private final int maxPayload; - private final ByteBuffer header; private final Object sendlock = new Object(); private final ThreadLocal encoder = new ThreadLocal() { @@ -66,8 +67,6 @@ public final class Disassembler implements Sender, } this.sender = sender; this.maxPayload = maxFrame - HEADER_SIZE; - this.header = ByteBuffer.allocate(HEADER_SIZE); - this.header.order(ByteOrder.BIG_ENDIAN); } @@ -78,39 +77,35 @@ public final class Disassembler implements Sender, public void flush() { - synchronized (sendlock) - { - sender.flush(); - } + sender.flush(); } public void close() { - synchronized (sendlock) - { - sender.close(); - } + sender.close(); } private void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buf) { synchronized (sendlock) { - header.put(0, flags); - header.put(1, type); - header.putShort(2, (short) (size + HEADER_SIZE)); - header.put(5, track); - header.putShort(6, (short) channel); - - header.rewind(); - - sender.send(header); - sender.flush(); + ByteBuffer data = ByteBuffer.allocate(size + HEADER_SIZE); + data.order(ByteOrder.BIG_ENDIAN); + + data.put(0, flags); + data.put(1, type); + data.putShort(2, (short) (size + HEADER_SIZE)); + data.put(5, track); + data.putShort(6, (short) channel); + data.position(HEADER_SIZE); int limit = buf.limit(); buf.limit(buf.position() + size); - sender.send(buf); + data.put(buf); buf.limit(limit); + + data.rewind(); + sender.send(data); } } @@ -166,14 +161,6 @@ public final class Disassembler implements Sender, method(method, SegmentType.COMMAND); } - private ByteBuffer copy(ByteBuffer src) - { - ByteBuffer buf = ByteBuffer.allocate(src.remaining()); - buf.put(src); - buf.flip(); - return buf; - } - private void method(Method method, SegmentType type) { BBEncoder enc = encoder.get(); @@ -228,7 +215,6 @@ public final class Disassembler implements Sender, { fragment(LAST_SEG, SegmentType.BODY, method, body); } - } } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java index bb7f059d15..c17527c19c 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.transport.network; -import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import org.apache.qpid.transport.TransportException; @@ -34,7 +34,7 @@ public class Transport public static final String UDP = "udp"; public static final String VM = "vm"; public static final String SOCKET = "socket"; - public static final String MULTICAST = "multicast"; + public static final String MULTICAST = "multicast"; // TODO public static final int DEFAULT_BUFFER_SIZE = 32 * 1024; public static final long DEFAULT_TIMEOUT = 60000; @@ -43,20 +43,35 @@ public class Transport public static final String MINA_TRANSPORT = "org.apache.qpid.transport.network.mina.MinaNetworkTransport"; public static final String IO_TRANSPORT = "org.apache.qpid.transport.network.io.IoNetworkTransport"; - public static final String NIO_TRANSPORT = "org.apache.qpid.transport.network.nio.NioNetworkTransport"; - public static final String NETTY_TRANSPORT = "org.apache.qpid.transport.network.netty.NettyNetworkTransport"; + public static final String NIO_TRANSPORT = "org.apache.qpid.transport.network.nio.NioNetworkTransport"; // TODO + public static final String NETTY_TRANSPORT = "org.apache.qpid.transport.network.netty.NettyNetworkTransport"; // TODO - private static final List _incoming = new ArrayList(); - private static final List _outgoing = new ArrayList(); + private static final List _incoming = new LinkedList(); + private static final List _outgoing = new LinkedList(); public static void registerIncomingTransport(Class transport) { - _incoming.add(transport.getName()); + registerTransport(_incoming, transport.getName()); + } + + public static void registerIncomingTransport(String transport) + { + registerTransport(_incoming, transport); } public static void registerOutgoingTransport(Class transport) { - _outgoing.add(transport.getName()); + registerTransport(_outgoing, transport.getName()); + } + + public static void registerOutgoingTransport(String transport) + { + registerTransport(_outgoing, transport); + } + + private static void registerTransport(List registered, String transport) + { + registered.add(transport); } public static IncomingNetworkTransport getIncomingTransport() throws TransportException @@ -71,7 +86,7 @@ public class Transport public static OutgoingNetworkTransport getOutgoingTransport(String protocol) throws TransportException { - return (OutgoingNetworkTransport) getTransport("outgoing", _outgoing, MINA_TRANSPORT, protocol); + return (OutgoingNetworkTransport) getTransport("outgoing", _outgoing, IO_TRANSPORT, protocol); } private static NetworkTransport getTransport(String direction, List registered, String defaultTransport, String protocol) @@ -95,7 +110,7 @@ public class Transport try { - String transport = System.getProperty("qpid.transport." + direction, MINA_TRANSPORT); + String transport = System.getProperty("qpid.transport." + direction, defaultTransport); Class clazz = Class.forName(transport); NetworkTransport network = (NetworkTransport) clazz.newInstance(); if (protocol == null || network.isCompatible(protocol)) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java index aa480554ea..0aee08adbe 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java @@ -66,8 +66,8 @@ public class IoNetworkTransport implements OutgoingNetworkTransport { _socket = new Socket(); - _log.debug("default-SO_RCVBUF : %s", _socket.getReceiveBufferSize()); - _log.debug("default-SO_SNDBUF : %s", _socket.getSendBufferSize()); + _log.debug("default SO_RCVBUF " + _socket.getReceiveBufferSize()); + _log.debug("default SO_SNDBUF " + _socket.getSendBufferSize()); _socket.setTcpNoDelay(noDelay); _socket.setKeepAlive(keepAlive); @@ -75,8 +75,8 @@ public class IoNetworkTransport implements OutgoingNetworkTransport _socket.setReceiveBufferSize(receiveBufferSize); _socket.setReuseAddress(true); - _log.debug("new-SO_RCVBUF : %s", _socket.getReceiveBufferSize()); - _log.debug("new-SO_SNDBUF : %s", _socket.getSendBufferSize()); + _log.debug("new SO_RCVBUF " + _socket.getReceiveBufferSize()); + _log.debug("new SO_SNDBUF " + _socket.getSendBufferSize()); InetAddress address = InetAddress.getByName(settings.getHost()); _socket.connect(new InetSocketAddress(address, settings.getPort())); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java index babfc3d698..d53031e21b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java @@ -24,19 +24,21 @@ import static org.apache.qpid.transport.util.Functions.*; import static org.apache.qpid.configuration.ClientProperties.*; import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.ExecutorThreadModel; import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.IoFilterAdapter; import org.apache.mina.common.IoFilterChain; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoSession; import org.apache.mina.filter.LoggingFilter; import org.apache.mina.filter.ReadThrottleFilterBuilder; import org.apache.mina.filter.SSLFilter; +import org.apache.mina.filter.executor.ExecutorFilter; import org.apache.mina.util.SessionUtil; import org.apache.qpid.protocol.ReceiverFactory; import org.apache.qpid.ssl.SSLContextFactory; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.network.NetworkConnection; -import org.apache.qpid.transport.network.NetworkTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,12 +49,12 @@ public class MinaNetworkHandler extends IoHandlerAdapter { private static final Logger _log = LoggerFactory.getLogger(MinaNetworkHandler.class); - private NetworkTransport _transport = null; + private MinaNetworkTransport _transport = null; private SSLContextFactory _sslFactory = null; private ReceiverFactory _factory = null; private boolean _debug = false; - public MinaNetworkHandler(NetworkTransport transport, SSLContextFactory sslFactory, ReceiverFactory factory) + public MinaNetworkHandler(MinaNetworkTransport transport, SSLContextFactory sslFactory, ReceiverFactory factory) { _transport = transport; _sslFactory = sslFactory; @@ -60,7 +62,7 @@ public class MinaNetworkHandler extends IoHandlerAdapter _debug = Boolean.getBoolean("amqj.protocol.debug"); } - public MinaNetworkHandler(NetworkTransport transport, SSLContextFactory sslFactory) + public MinaNetworkHandler(MinaNetworkTransport transport, SSLContextFactory sslFactory) { this(transport, sslFactory, null); } @@ -83,6 +85,7 @@ public class MinaNetworkHandler extends IoHandlerAdapter public void exceptionCaught(IoSession ssn, Throwable e) { Receiver receiver = (Receiver) ssn.getAttachment(); + _log.error("Caught exception in transport layer", e); receiver.exception(e); } @@ -100,6 +103,12 @@ public class MinaNetworkHandler extends IoHandlerAdapter SessionUtil.initialize(session); IoFilterChain chain = session.getFilterChain(); + if (chain.contains(ExecutorThreadModel.class.getName())) + { + chain.remove(ExecutorThreadModel.class.getName()); + } + IoFilterAdapter filter = new ExecutorFilter(_transport.getExecutor()); + chain.addFirst("sessionExecutor", filter); // Add SSL filter if (_sslFactory != null) @@ -158,8 +167,6 @@ public class MinaNetworkHandler extends IoHandlerAdapter { _log.info("Idle MINA session: " + System.identityHashCode(session)); session.close(); - Receiver receiver = (Receiver) session.getAttachment(); - receiver.closed(); } } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java index 2010b2dd93..ac1b959de7 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java @@ -28,23 +28,27 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.ExecutorThreadModel; import org.apache.mina.common.IoAcceptor; +import org.apache.mina.common.IoAcceptorConfig; import org.apache.mina.common.IoConnector; +import org.apache.mina.common.IoFilterAdapter; +import org.apache.mina.common.IoFilterChain; import org.apache.mina.common.IoServiceConfig; import org.apache.mina.common.IoSession; import org.apache.mina.common.PooledByteBufferAllocator; import org.apache.mina.common.SimpleByteBufferAllocator; +import org.apache.mina.common.ThreadModel; +import org.apache.mina.filter.executor.ExecutorFilter; import org.apache.mina.transport.socket.nio.DatagramAcceptor; -import org.apache.mina.transport.socket.nio.DatagramAcceptorConfig; import org.apache.mina.transport.socket.nio.DatagramConnector; import org.apache.mina.transport.socket.nio.DatagramSessionConfig; import org.apache.mina.transport.socket.nio.ExistingSocketConnector; import org.apache.mina.transport.socket.nio.SocketAcceptor; -import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; import org.apache.mina.transport.socket.nio.SocketConnector; import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.apache.mina.transport.vmpipe.VmPipeAcceptor; @@ -71,7 +75,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN public static final List SUPPORTED = Arrays.asList(Transport.SOCKET, Transport.TCP, Transport.UDP, Transport.VM); private int _threads; - private Executor _executor; + private ExecutorService _executor; private ConnectionSettings _settings; private SocketAddress _address; private IoConnector _connector; @@ -93,7 +97,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN org.apache.mina.common.ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); } - int processors = Runtime.getRuntime().availableProcessors(); + int processors = (Runtime.getRuntime().availableProcessors() * 4) + 1; _threads = Integer.parseInt(System.getProperty("amqj.processors", Integer.toString(processors))); _executor = Executors.newCachedThreadPool(Threading.getThreadFactory()); } @@ -130,7 +134,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN if (socket == null) { throw new IllegalArgumentException("Active Socket must be provided for broker " + - "with 'socket://' transport"); + "with 'socket://' transport"); } _address = socket.getRemoteSocketAddress(); _connector = new ExistingSocketConnector(1, _executor); @@ -142,25 +146,26 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN } _log.info("Connecting to broker on: " + _address); - String s = "-"; + String name = "MINANetworkTransport(Client)"; StackTraceElement[] trace = Thread.currentThread().getStackTrace(); for (StackTraceElement elt : trace) { - if (elt.getClassName().contains("Test")) + if (elt.getClassName().endsWith("Test")) { - s += elt.getClassName(); - break; + name += "-" + elt.getClassName(); +// break; // FIXME } } - - IoServiceConfig cfg = _connector.getDefaultConfig(); - cfg.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkTransport(Client)" + s)); - + + IoServiceConfig config = _connector.getDefaultConfig(); + config.setThreadModel(ThreadModel.MANUAL); + // Socket based connection configuration only (TCP/SOCKET) if (_connector instanceof SocketConnector) { - SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); + SocketSessionConfig scfg = (SocketSessionConfig) config.getSessionConfig(); scfg.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay")); + scfg.setKeepAlive(Boolean.getBoolean("amqj.keepAlive")); Integer sendBufferSize = Integer.getInteger("amqj.sendBufferSize", Transport.DEFAULT_BUFFER_SIZE); Integer receiveBufferSize = Integer.getInteger("amqj.receiveBufferSize", Transport.DEFAULT_BUFFER_SIZE); scfg.setSendBufferSize(sendBufferSize); @@ -173,7 +178,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN } // Connect to the broker - ConnectFuture future = _connector.connect(_address, new MinaNetworkHandler(this, sslFactory), cfg); + ConnectFuture future = _connector.connect(_address, new MinaNetworkHandler(this, sslFactory), config); future.join(); if (!future.isConnected()) { @@ -181,6 +186,14 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN } _session = future.getSession(); _session.setAttachment(_receiver); + + IoFilterChain chain = _session.getFilterChain(); + if (chain.contains(ExecutorThreadModel.class.getName())) + { + chain.remove(ExecutorThreadModel.class.getName()); + } + IoFilterAdapter filter = new ExecutorFilter(_executor); + chain.addFirst("clientExecutor", filter); return new MinaNetworkConnection(_session); } @@ -191,9 +204,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN { _acceptor = new SocketAcceptor(_threads, _executor); - SocketAcceptorConfig sconfig = (SocketAcceptorConfig) _acceptor.getDefaultConfig(); - sconfig.setDisconnectOnUnbind(true); - SocketSessionConfig ssc = (SocketSessionConfig) sconfig.getSessionConfig(); + SocketSessionConfig ssc = (SocketSessionConfig) _acceptor.getDefaultConfig().getSessionConfig(); ssc.setReuseAddress(true); ssc.setKeepAlive(Boolean.getBoolean("amqj.keepAlive")); ssc.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay")); @@ -215,9 +226,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN { _acceptor = new DatagramAcceptor(_executor); - DatagramAcceptorConfig dconfig = (DatagramAcceptorConfig) _acceptor.getDefaultConfig(); - dconfig.setDisconnectOnUnbind(true); - DatagramSessionConfig dsc = (DatagramSessionConfig) dconfig.getSessionConfig(); + DatagramSessionConfig dsc = (DatagramSessionConfig) _acceptor.getDefaultConfig().getSessionConfig(); dsc.setReuseAddress(true); Integer sendBufferSize = Integer.getInteger("amqj.sendBufferSize", Transport.DEFAULT_BUFFER_SIZE); Integer receiveBufferSize = Integer.getInteger("amqj.receiveBufferSize", Transport.DEFAULT_BUFFER_SIZE); @@ -235,16 +244,17 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN } else if (settings.getProtocol().equalsIgnoreCase(Transport.VM)) { - _acceptor = new VmPipeAcceptor(); - _address = new VmPipeAddress(settings.getPort()); + _acceptor = new VmPipeAcceptor(); + _address = new VmPipeAddress(settings.getPort()); } else { throw new TransportException("Unknown protocol: " + settings.getProtocol()); } - IoServiceConfig cfg = _acceptor.getDefaultConfig(); - cfg.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkTransport(Broker)")); + IoAcceptorConfig config = (IoAcceptorConfig) _acceptor.getDefaultConfig(); + config.setThreadModel(ThreadModel.MANUAL); + config.setDisconnectOnUnbind(true); try { @@ -255,6 +265,11 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN throw new TransportException("Could not bind to " + _address, e); } } + + public Executor getExecutor() + { + return _executor; + } public SocketAddress getAddress() { @@ -275,6 +290,10 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN { _session.close(); } + if (_executor != null) + { + _executor.shutdownNow(); + } } public boolean isCompatible(String protocol) { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java index 5fc3032d35..10d70ed34f 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java @@ -26,20 +26,16 @@ import org.apache.mina.common.IoSession; import org.apache.mina.common.WriteFuture; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import org.apache.qpid.transport.network.Transport; /** * MinaSender */ public class MinaSender implements Sender { - private static final Logger _log = LoggerFactory.getLogger(MinaSender.class); - private final IoSession _session; - private WriteFuture _lastWrite; - private int _idleTimeout = 0; + private int _idle = 0; + private WriteFuture _written; public MinaSender(IoSession session) { @@ -52,41 +48,36 @@ public class MinaSender implements Sender { throw new TransportException("attempted to write to a closed socket"); } - ByteBuffer mina = ByteBuffer.allocate(buf.capacity()); - mina.put(buf); - mina.flip(); - flush(); - _lastWrite = _session.write(mina); + _written = _session.write(ByteBuffer.wrap(buf)); } public synchronized void flush() { - if (_lastWrite != null) + if (_written != null) { - _lastWrite.join(); - if (!_lastWrite.isWritten()) - { - throw new RuntimeException("Error flushing buffer"); - } + _written.join(Transport.DEFAULT_TIMEOUT); + if (!_written.isWritten()) + { + throw new TransportException("Error flushing data buffer"); + } } } - public void close() + public synchronized void close() { - // MINA will sometimes throw away in-progress writes when you ask it to close flush(); CloseFuture closed = _session.close(); closed.join(); } - public void setIdleTimeout(int i) + public void setIdleTimeout(int idle) { - _idleTimeout = i; - _session.setWriteTimeout(_idleTimeout); + _idle = idle; + _session.setWriteTimeout(_idle); } public long getIdleTimeout() { - return _idleTimeout; + return _idle; } } -- cgit v1.2.1