diff options
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java')
-rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java | 95 |
1 files changed, 70 insertions, 25 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java index 838a662402..42c8334a5d 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java @@ -21,7 +21,11 @@ package org.apache.qpid.transport.network.io; import java.io.IOException; -import java.net.*; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketException; import java.nio.ByteBuffer; import javax.net.ssl.SSLContext; @@ -29,16 +33,18 @@ import javax.net.ssl.SSLServerSocketFactory; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.transport.*; +import org.apache.qpid.transport.ConnectionSettings; +import org.apache.qpid.transport.NetworkTransportConfiguration; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.IncomingNetworkTransport; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.OutgoingNetworkTransport; -import org.apache.qpid.transport.util.Logger; +import org.slf4j.LoggerFactory; public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport { - - private static final Logger LOGGER = Logger.get(IoNetworkTransport.class); + private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(IoNetworkTransport.class); private Socket _socket; private IoNetworkConnection _connection; @@ -58,10 +64,13 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet _socket.setSendBufferSize(sendBufferSize); _socket.setReceiveBufferSize(receiveBufferSize); - LOGGER.debug("SO_RCVBUF : %s", _socket.getReceiveBufferSize()); - LOGGER.debug("SO_SNDBUF : %s", _socket.getSendBufferSize()); - LOGGER.debug("TCP_NODELAY : %s", _socket.getTcpNoDelay()); - + if(LOGGER.isDebugEnabled()) + { + LOGGER.debug("SO_RCVBUF : " + _socket.getReceiveBufferSize()); + LOGGER.debug("SO_SNDBUF : " + _socket.getSendBufferSize()); + LOGGER.debug("TCP_NODELAY : " + _socket.getTcpNoDelay()); + } + InetAddress address = InetAddress.getByName(settings.getHost()); _socket.connect(new InetSocketAddress(address, settings.getPort())); @@ -120,7 +129,7 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet try { _acceptor = new AcceptingThread(config, factory, sslContext); - + _acceptor.setDaemon(false); _acceptor.start(); } catch (IOException e) @@ -133,9 +142,10 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet private class AcceptingThread extends Thread { + private volatile boolean _closed = false; private NetworkTransportConfiguration _config; private ProtocolEngineFactory _factory; - private SSLContext _sslContent; + private SSLContext _sslContext; private ServerSocket _serverSocket; private AcceptingThread(NetworkTransportConfiguration config, @@ -145,9 +155,9 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet { _config = config; _factory = factory; - _sslContent = sslContext; + _sslContext = sslContext; - InetSocketAddress address = new InetSocketAddress(config.getHost(), config.getPort()); + InetSocketAddress address = config.getAddress(); if(sslContext == null) { @@ -155,12 +165,12 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet } else { - SSLServerSocketFactory socketFactory = sslContext.getServerSocketFactory(); + SSLServerSocketFactory socketFactory = _sslContext.getServerSocketFactory(); _serverSocket = socketFactory.createServerSocket(); } - _serverSocket.bind(address); _serverSocket.setReuseAddress(true); + _serverSocket.bind(address); } @@ -171,6 +181,9 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet */ public void close() { + LOGGER.debug("Shutting down the Acceptor"); + _closed = true; + if (!_serverSocket.isClosed()) { try @@ -189,11 +202,12 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet { try { - while (true) + while (!_closed) { + Socket socket = null; try { - Socket socket = _serverSocket.accept(); + socket = _serverSocket.accept(); socket.setTcpNoDelay(_config.getTcpNoDelay()); final Integer sendBufferSize = _config.getSendBufferSize(); @@ -206,27 +220,58 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet NetworkConnection connection = new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout); - engine.setNetworkConnection(connection, connection.getSender()); connection.start(); - - } catch(RuntimeException e) { - LOGGER.error(e, "Error in Acceptor thread " + _config.getPort()); + LOGGER.error("Error in Acceptor thread on port " + _config.getPort(), e); + closeSocketIfNecessary(socket); + } + catch(IOException e) + { + if(!_closed) + { + LOGGER.error("Error in Acceptor thread on port " + _config.getPort(), e); + closeSocketIfNecessary(socket); + try + { + //Delay to avoid tight spinning the loop during issues such as too many open files + Thread.sleep(1000); + } + catch (InterruptedException ie) + { + LOGGER.debug("Stopping acceptor due to interrupt request"); + _closed = true; + } + } } } } - catch (IOException e) + finally { - LOGGER.debug(e, "SocketException - no new connections will be accepted on port " - + _config.getPort()); + if(LOGGER.isDebugEnabled()) + { + LOGGER.debug("Acceptor exiting, no new connections will be accepted on port " + _config.getPort()); + } } } - + private void closeSocketIfNecessary(final Socket socket) + { + if(socket != null) + { + try + { + socket.close(); + } + catch (IOException e) + { + LOGGER.debug("Exception while closing socket", e); + } + } + } } } |