diff options
Diffstat (limited to 'java/common/src/main/java/org/apache/qpid/transport/network/io')
4 files changed, 231 insertions, 38 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java new file mode 100644 index 0000000000..54a2a360bb --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java @@ -0,0 +1,87 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.transport.network.io; + +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.Ticker; +import org.apache.qpid.transport.network.TransportActivity; + +class IdleTimeoutTicker implements Ticker +{ + private final TransportActivity _transport; + private final int _defaultTimeout; + private NetworkConnection _connection; + + public IdleTimeoutTicker(TransportActivity transport, int defaultTimeout) + { + _transport = transport; + _defaultTimeout = defaultTimeout; + } + + @Override + public int getTimeToNextTick(long currentTime) + { + long nextTime = -1; + final long maxReadIdle = 1000l * _connection.getMaxReadIdle(); + + if(maxReadIdle > 0) + { + nextTime = _transport.getLastReadTime() + maxReadIdle; + } + + long maxWriteIdle = 1000l * _connection.getMaxWriteIdle(); + + if(maxWriteIdle > 0) + { + long writeTime = _transport.getLastWriteTime() + maxWriteIdle; + if(nextTime == -1l || writeTime < nextTime) + { + nextTime = writeTime; + } + } + return nextTime == -1 ? _defaultTimeout : (int) (nextTime - currentTime); + } + + @Override + public int tick(long currentTime) + { + // writer Idle + long maxWriteIdle = 1000l * _connection.getMaxWriteIdle(); + if(maxWriteIdle > 0 && maxWriteIdle+ _transport.getLastWriteTime() <= currentTime) + { + _transport.writerIdle(); + } + // reader Idle + final long maxReadIdle = 1000l * _connection.getMaxReadIdle(); + if(maxReadIdle > 0 && maxReadIdle+ _transport.getLastReadTime() <= currentTime) + { + + _transport.readerIdle(); + } + return getTimeToNextTick(currentTime); + } + + public void setConnection(NetworkConnection connection) + { + _connection = connection; + } +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java index 2658296c5f..f5c09ac2cc 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java @@ -26,7 +26,9 @@ import java.nio.ByteBuffer; import java.security.Principal; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.Ticker; import org.apache.qpid.transport.network.NetworkConnection; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,14 +40,23 @@ public class IoNetworkConnection implements NetworkConnection private final IoSender _ioSender; private final IoReceiver _ioReceiver; private Principal _principal; + private int _maxReadIdle; + private int _maxWriteIdle; public IoNetworkConnection(Socket socket, Receiver<ByteBuffer> delegate, - int sendBufferSize, int receiveBufferSize, long timeout) + int sendBufferSize, int receiveBufferSize, long timeout) + { + this(socket,delegate,sendBufferSize,receiveBufferSize,timeout,null); + } + + public IoNetworkConnection(Socket socket, Receiver<ByteBuffer> delegate, + int sendBufferSize, int receiveBufferSize, long timeout, Ticker ticker) { _socket = socket; _timeout = timeout; _ioReceiver = new IoReceiver(_socket, delegate, receiveBufferSize,_timeout); + _ioReceiver.setTicker(ticker); _ioSender = new IoSender(_socket, 2 * sendBufferSize, _timeout); @@ -88,14 +99,12 @@ public class IoNetworkConnection implements NetworkConnection public void setMaxWriteIdle(int sec) { - // TODO implement support for setting heartbeating config in this way - // Currently a socket timeout is used in IoSender + _maxWriteIdle = sec; } public void setMaxReadIdle(int sec) { - // TODO implement support for setting heartbeating config in this way - // Currently a socket timeout is used in IoSender + _maxReadIdle = sec; } @Override @@ -109,4 +118,16 @@ public class IoNetworkConnection implements NetworkConnection { return _principal; } + + @Override + public int getMaxReadIdle() + { + return _maxReadIdle; + } + + @Override + public int getMaxWriteIdle() + { + return _maxWriteIdle; + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java index 9b6f0a0b1b..c8027e143e 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java @@ -41,9 +41,8 @@ import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.NetworkTransportConfiguration; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.network.IncomingNetworkTransport; -import org.apache.qpid.transport.network.NetworkConnection; -import org.apache.qpid.transport.network.OutgoingNetworkTransport; +import org.apache.qpid.transport.network.*; + import org.slf4j.LoggerFactory; public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport @@ -56,7 +55,9 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet private IoNetworkConnection _connection; private AcceptingThread _acceptor; - public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContext sslContext) + public NetworkConnection connect(ConnectionSettings settings, + Receiver<ByteBuffer> delegate, + TransportActivity transportActivity) { int sendBufferSize = settings.getWriteBufferSize(); int receiveBufferSize = settings.getReadBufferSize(); @@ -91,7 +92,9 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet try { - _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT); + IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT); + _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT, ticker); + ticker.setConnection(_connection); _connection.start(); } catch(Exception e) @@ -128,9 +131,10 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet return _connection; } - public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, SSLContext sslContext) + public void accept(NetworkTransportConfiguration config, + ProtocolEngineFactory factory, + SSLContext sslContext) { - try { _acceptor = new AcceptingThread(config, factory, sslContext); @@ -141,8 +145,6 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet { throw new TransportException("Unable to start server socket", e); } - - } private class AcceptingThread extends Thread @@ -152,15 +154,16 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet private ProtocolEngineFactory _factory; private SSLContext _sslContext; private ServerSocket _serverSocket; + private int _timeout; private AcceptingThread(NetworkTransportConfiguration config, ProtocolEngineFactory factory, - SSLContext sslContext) - throws IOException + SSLContext sslContext) throws IOException { _config = config; _factory = factory; _sslContext = sslContext; + _timeout = TIMEOUT; InetSocketAddress address = config.getAddress(); @@ -172,15 +175,19 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet { SSLServerSocketFactory socketFactory = _sslContext.getServerSocketFactory(); _serverSocket = socketFactory.createServerSocket(); - ((SSLServerSocket)_serverSocket).setNeedClientAuth(config.needClientAuth()); - ((SSLServerSocket)_serverSocket).setWantClientAuth(config.wantClientAuth()); + if(config.needClientAuth()) + { + ((SSLServerSocket)_serverSocket).setNeedClientAuth(true); + } + else if(config.wantClientAuth()) + { + ((SSLServerSocket)_serverSocket).setWantClientAuth(true); + } } _serverSocket.setReuseAddress(true); _serverSocket.bind(address); - - } @@ -217,6 +224,7 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet { socket = _serverSocket.accept(); socket.setTcpNoDelay(_config.getTcpNoDelay()); + socket.setSoTimeout(_timeout); final Integer sendBufferSize = _config.getSendBufferSize(); final Integer receiveBufferSize = _config.getReceiveBufferSize(); @@ -224,10 +232,12 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet socket.setSendBufferSize(sendBufferSize); socket.setReceiveBufferSize(receiveBufferSize); - ProtocolEngine engine = _factory.newProtocolEngine(); - NetworkConnection connection = new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, TIMEOUT); + final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT); + NetworkConnection connection = new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout, + ticker); + ticker.setConnection(connection); if(_sslContext != null) { @@ -248,14 +258,14 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet } catch(RuntimeException e) { - LOGGER.error("Error in Acceptor thread on port " + _config.getPort(), e); + LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e); closeSocketIfNecessary(socket); } catch(IOException e) { if(!_closed) { - LOGGER.error("Error in Acceptor thread on port " + _config.getPort(), e); + LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e); closeSocketIfNecessary(socket); try { @@ -275,7 +285,7 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet { if(LOGGER.isDebugEnabled()) { - LOGGER.debug("Acceptor exiting, no new connections will be accepted on port " + _config.getPort()); + LOGGER.debug("Acceptor exiting, no new connections will be accepted on address " + _config.getAddress()); } } } @@ -294,6 +304,7 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet } } } + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java index 7e63071c16..06a43e21c6 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java @@ -24,6 +24,7 @@ import org.apache.qpid.common.Closeable; import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.Ticker; import org.apache.qpid.transport.util.Logger; import javax.net.ssl.SSLSocket; @@ -31,6 +32,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.Socket; import java.net.SocketException; +import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; @@ -51,6 +53,8 @@ final class IoReceiver implements Runnable, Closeable private final AtomicBoolean closed = new AtomicBoolean(false); private final Thread receiverThread; private static final boolean shutdownBroken; + + private Ticker _ticker; static { String osName = System.getProperty("os.name"); @@ -136,7 +140,7 @@ final class IoReceiver implements Runnable, Closeable { final int threshold = bufferSize / 2; - // I set the read buffer size simillar to SO_RCVBUF + // I set the read buffer size similar to SO_RCVBUF // Haven't tested with a lower value to see if it's better or worse byte[] buffer = new byte[bufferSize]; try @@ -144,27 +148,71 @@ final class IoReceiver implements Runnable, Closeable InputStream in = socket.getInputStream(); int read = 0; int offset = 0; - while ((read = in.read(buffer, offset, bufferSize-offset)) != -1) + long currentTime; + while(read != -1) { - if (read > 0) + try + { + while ((read = in.read(buffer, offset, bufferSize-offset)) != -1) + { + if (read > 0) + { + ByteBuffer b = ByteBuffer.wrap(buffer,offset,read); + receiver.received(b); + offset+=read; + if (offset > threshold) + { + offset = 0; + buffer = new byte[bufferSize]; + } + } + currentTime = System.currentTimeMillis(); + + if(_ticker != null) + { + int tick = _ticker.getTimeToNextTick(currentTime); + if(tick <= 0) + { + tick = _ticker.tick(currentTime); + } + try + { + if(!socket.isClosed()) + { + socket.setSoTimeout(tick <= 0 ? 1 : tick); + } + } + catch(SocketException e) + { + // ignore - closed socket + } + } + } + } + catch (SocketTimeoutException e) { - ByteBuffer b = ByteBuffer.wrap(buffer,offset,read); - receiver.received(b); - offset+=read; - if (offset > threshold) + currentTime = System.currentTimeMillis(); + if(_ticker != null) { - offset = 0; - buffer = new byte[bufferSize]; + final int tick = _ticker.tick(currentTime); + if(!socket.isClosed()) + { + try + { + socket.setSoTimeout(tick <= 0 ? 1 : tick ); + } + catch(SocketException ex) + { + // ignore - closed socket + } + } } } } } catch (Throwable t) { - if (!(shutdownBroken && - t instanceof SocketException && - t.getMessage().equalsIgnoreCase("socket closed") && - closed.get())) + if (shouldReport(t)) { receiver.exception(t); } @@ -183,4 +231,30 @@ final class IoReceiver implements Runnable, Closeable } } + private boolean shouldReport(Throwable t) + { + boolean brokenClose = closed.get() && + shutdownBroken && + t instanceof SocketException && + "socket closed".equalsIgnoreCase(t.getMessage()); + + boolean sslSocketClosed = closed.get() && + socket instanceof SSLSocket && + t instanceof SocketException && + "Socket is closed".equalsIgnoreCase(t.getMessage()); + + return !brokenClose && !sslSocketClosed; + } + + public Ticker getTicker() + { + return _ticker; + } + + public void setTicker(Ticker ticker) + { + _ticker = ticker; + } + + } |