summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java95
1 files changed, 70 insertions, 25 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
index 838a662402..42c8334a5d 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
@@ -21,7 +21,11 @@
package org.apache.qpid.transport.network.io;
import java.io.IOException;
-import java.net.*;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
import java.nio.ByteBuffer;
import javax.net.ssl.SSLContext;
@@ -29,16 +33,18 @@ import javax.net.ssl.SSLServerSocketFactory;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.transport.*;
+import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.NetworkTransportConfiguration;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.IncomingNetworkTransport;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.OutgoingNetworkTransport;
-import org.apache.qpid.transport.util.Logger;
+import org.slf4j.LoggerFactory;
public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
{
-
- private static final Logger LOGGER = Logger.get(IoNetworkTransport.class);
+ private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(IoNetworkTransport.class);
private Socket _socket;
private IoNetworkConnection _connection;
@@ -58,10 +64,13 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
_socket.setSendBufferSize(sendBufferSize);
_socket.setReceiveBufferSize(receiveBufferSize);
- LOGGER.debug("SO_RCVBUF : %s", _socket.getReceiveBufferSize());
- LOGGER.debug("SO_SNDBUF : %s", _socket.getSendBufferSize());
- LOGGER.debug("TCP_NODELAY : %s", _socket.getTcpNoDelay());
-
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("SO_RCVBUF : " + _socket.getReceiveBufferSize());
+ LOGGER.debug("SO_SNDBUF : " + _socket.getSendBufferSize());
+ LOGGER.debug("TCP_NODELAY : " + _socket.getTcpNoDelay());
+ }
+
InetAddress address = InetAddress.getByName(settings.getHost());
_socket.connect(new InetSocketAddress(address, settings.getPort()));
@@ -120,7 +129,7 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
try
{
_acceptor = new AcceptingThread(config, factory, sslContext);
-
+ _acceptor.setDaemon(false);
_acceptor.start();
}
catch (IOException e)
@@ -133,9 +142,10 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
private class AcceptingThread extends Thread
{
+ private volatile boolean _closed = false;
private NetworkTransportConfiguration _config;
private ProtocolEngineFactory _factory;
- private SSLContext _sslContent;
+ private SSLContext _sslContext;
private ServerSocket _serverSocket;
private AcceptingThread(NetworkTransportConfiguration config,
@@ -145,9 +155,9 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
{
_config = config;
_factory = factory;
- _sslContent = sslContext;
+ _sslContext = sslContext;
- InetSocketAddress address = new InetSocketAddress(config.getHost(), config.getPort());
+ InetSocketAddress address = config.getAddress();
if(sslContext == null)
{
@@ -155,12 +165,12 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
}
else
{
- SSLServerSocketFactory socketFactory = sslContext.getServerSocketFactory();
+ SSLServerSocketFactory socketFactory = _sslContext.getServerSocketFactory();
_serverSocket = socketFactory.createServerSocket();
}
- _serverSocket.bind(address);
_serverSocket.setReuseAddress(true);
+ _serverSocket.bind(address);
}
@@ -171,6 +181,9 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
*/
public void close()
{
+ LOGGER.debug("Shutting down the Acceptor");
+ _closed = true;
+
if (!_serverSocket.isClosed())
{
try
@@ -189,11 +202,12 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
{
try
{
- while (true)
+ while (!_closed)
{
+ Socket socket = null;
try
{
- Socket socket = _serverSocket.accept();
+ socket = _serverSocket.accept();
socket.setTcpNoDelay(_config.getTcpNoDelay());
final Integer sendBufferSize = _config.getSendBufferSize();
@@ -206,27 +220,58 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
NetworkConnection connection = new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout);
-
engine.setNetworkConnection(connection, connection.getSender());
connection.start();
-
-
}
catch(RuntimeException e)
{
- LOGGER.error(e, "Error in Acceptor thread " + _config.getPort());
+ LOGGER.error("Error in Acceptor thread on port " + _config.getPort(), e);
+ closeSocketIfNecessary(socket);
+ }
+ catch(IOException e)
+ {
+ if(!_closed)
+ {
+ LOGGER.error("Error in Acceptor thread on port " + _config.getPort(), e);
+ closeSocketIfNecessary(socket);
+ try
+ {
+ //Delay to avoid tight spinning the loop during issues such as too many open files
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException ie)
+ {
+ LOGGER.debug("Stopping acceptor due to interrupt request");
+ _closed = true;
+ }
+ }
}
}
}
- catch (IOException e)
+ finally
{
- LOGGER.debug(e, "SocketException - no new connections will be accepted on port "
- + _config.getPort());
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Acceptor exiting, no new connections will be accepted on port " + _config.getPort());
+ }
}
}
-
+ private void closeSocketIfNecessary(final Socket socket)
+ {
+ if(socket != null)
+ {
+ try
+ {
+ socket.close();
+ }
+ catch (IOException e)
+ {
+ LOGGER.debug("Exception while closing socket", e);
+ }
+ }
+ }
}
}