summaryrefslogtreecommitdiff
path: root/java/common/src/main/java/org/apache/qpid/transport/network/io
diff options
context:
space:
mode:
Diffstat (limited to 'java/common/src/main/java/org/apache/qpid/transport/network/io')
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java87
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java31
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java51
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java100
4 files changed, 231 insertions, 38 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java
new file mode 100644
index 0000000000..54a2a360bb
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.transport.network.io;
+
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.Ticker;
+import org.apache.qpid.transport.network.TransportActivity;
+
+class IdleTimeoutTicker implements Ticker
+{
+ private final TransportActivity _transport;
+ private final int _defaultTimeout;
+ private NetworkConnection _connection;
+
+ public IdleTimeoutTicker(TransportActivity transport, int defaultTimeout)
+ {
+ _transport = transport;
+ _defaultTimeout = defaultTimeout;
+ }
+
+ @Override
+ public int getTimeToNextTick(long currentTime)
+ {
+ long nextTime = -1;
+ final long maxReadIdle = 1000l * _connection.getMaxReadIdle();
+
+ if(maxReadIdle > 0)
+ {
+ nextTime = _transport.getLastReadTime() + maxReadIdle;
+ }
+
+ long maxWriteIdle = 1000l * _connection.getMaxWriteIdle();
+
+ if(maxWriteIdle > 0)
+ {
+ long writeTime = _transport.getLastWriteTime() + maxWriteIdle;
+ if(nextTime == -1l || writeTime < nextTime)
+ {
+ nextTime = writeTime;
+ }
+ }
+ return nextTime == -1 ? _defaultTimeout : (int) (nextTime - currentTime);
+ }
+
+ @Override
+ public int tick(long currentTime)
+ {
+ // writer Idle
+ long maxWriteIdle = 1000l * _connection.getMaxWriteIdle();
+ if(maxWriteIdle > 0 && maxWriteIdle+ _transport.getLastWriteTime() <= currentTime)
+ {
+ _transport.writerIdle();
+ }
+ // reader Idle
+ final long maxReadIdle = 1000l * _connection.getMaxReadIdle();
+ if(maxReadIdle > 0 && maxReadIdle+ _transport.getLastReadTime() <= currentTime)
+ {
+
+ _transport.readerIdle();
+ }
+ return getTimeToNextTick(currentTime);
+ }
+
+ public void setConnection(NetworkConnection connection)
+ {
+ _connection = connection;
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
index 2658296c5f..f5c09ac2cc 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
@@ -26,7 +26,9 @@ import java.nio.ByteBuffer;
import java.security.Principal;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.Ticker;
import org.apache.qpid.transport.network.NetworkConnection;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,14 +40,23 @@ public class IoNetworkConnection implements NetworkConnection
private final IoSender _ioSender;
private final IoReceiver _ioReceiver;
private Principal _principal;
+ private int _maxReadIdle;
+ private int _maxWriteIdle;
public IoNetworkConnection(Socket socket, Receiver<ByteBuffer> delegate,
- int sendBufferSize, int receiveBufferSize, long timeout)
+ int sendBufferSize, int receiveBufferSize, long timeout)
+ {
+ this(socket,delegate,sendBufferSize,receiveBufferSize,timeout,null);
+ }
+
+ public IoNetworkConnection(Socket socket, Receiver<ByteBuffer> delegate,
+ int sendBufferSize, int receiveBufferSize, long timeout, Ticker ticker)
{
_socket = socket;
_timeout = timeout;
_ioReceiver = new IoReceiver(_socket, delegate, receiveBufferSize,_timeout);
+ _ioReceiver.setTicker(ticker);
_ioSender = new IoSender(_socket, 2 * sendBufferSize, _timeout);
@@ -88,14 +99,12 @@ public class IoNetworkConnection implements NetworkConnection
public void setMaxWriteIdle(int sec)
{
- // TODO implement support for setting heartbeating config in this way
- // Currently a socket timeout is used in IoSender
+ _maxWriteIdle = sec;
}
public void setMaxReadIdle(int sec)
{
- // TODO implement support for setting heartbeating config in this way
- // Currently a socket timeout is used in IoSender
+ _maxReadIdle = sec;
}
@Override
@@ -109,4 +118,16 @@ public class IoNetworkConnection implements NetworkConnection
{
return _principal;
}
+
+ @Override
+ public int getMaxReadIdle()
+ {
+ return _maxReadIdle;
+ }
+
+ @Override
+ public int getMaxWriteIdle()
+ {
+ return _maxWriteIdle;
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
index 9b6f0a0b1b..c8027e143e 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
@@ -41,9 +41,8 @@ import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.NetworkTransportConfiguration;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.IncomingNetworkTransport;
-import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.OutgoingNetworkTransport;
+import org.apache.qpid.transport.network.*;
+
import org.slf4j.LoggerFactory;
public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
@@ -56,7 +55,9 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
private IoNetworkConnection _connection;
private AcceptingThread _acceptor;
- public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContext sslContext)
+ public NetworkConnection connect(ConnectionSettings settings,
+ Receiver<ByteBuffer> delegate,
+ TransportActivity transportActivity)
{
int sendBufferSize = settings.getWriteBufferSize();
int receiveBufferSize = settings.getReadBufferSize();
@@ -91,7 +92,9 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
try
{
- _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT);
+ IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT);
+ _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT, ticker);
+ ticker.setConnection(_connection);
_connection.start();
}
catch(Exception e)
@@ -128,9 +131,10 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
return _connection;
}
- public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, SSLContext sslContext)
+ public void accept(NetworkTransportConfiguration config,
+ ProtocolEngineFactory factory,
+ SSLContext sslContext)
{
-
try
{
_acceptor = new AcceptingThread(config, factory, sslContext);
@@ -141,8 +145,6 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
{
throw new TransportException("Unable to start server socket", e);
}
-
-
}
private class AcceptingThread extends Thread
@@ -152,15 +154,16 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
private ProtocolEngineFactory _factory;
private SSLContext _sslContext;
private ServerSocket _serverSocket;
+ private int _timeout;
private AcceptingThread(NetworkTransportConfiguration config,
ProtocolEngineFactory factory,
- SSLContext sslContext)
- throws IOException
+ SSLContext sslContext) throws IOException
{
_config = config;
_factory = factory;
_sslContext = sslContext;
+ _timeout = TIMEOUT;
InetSocketAddress address = config.getAddress();
@@ -172,15 +175,19 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
{
SSLServerSocketFactory socketFactory = _sslContext.getServerSocketFactory();
_serverSocket = socketFactory.createServerSocket();
- ((SSLServerSocket)_serverSocket).setNeedClientAuth(config.needClientAuth());
- ((SSLServerSocket)_serverSocket).setWantClientAuth(config.wantClientAuth());
+ if(config.needClientAuth())
+ {
+ ((SSLServerSocket)_serverSocket).setNeedClientAuth(true);
+ }
+ else if(config.wantClientAuth())
+ {
+ ((SSLServerSocket)_serverSocket).setWantClientAuth(true);
+ }
}
_serverSocket.setReuseAddress(true);
_serverSocket.bind(address);
-
-
}
@@ -217,6 +224,7 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
{
socket = _serverSocket.accept();
socket.setTcpNoDelay(_config.getTcpNoDelay());
+ socket.setSoTimeout(_timeout);
final Integer sendBufferSize = _config.getSendBufferSize();
final Integer receiveBufferSize = _config.getReceiveBufferSize();
@@ -224,10 +232,12 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
socket.setSendBufferSize(sendBufferSize);
socket.setReceiveBufferSize(receiveBufferSize);
-
ProtocolEngine engine = _factory.newProtocolEngine();
- NetworkConnection connection = new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, TIMEOUT);
+ final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
+ NetworkConnection connection = new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout,
+ ticker);
+ ticker.setConnection(connection);
if(_sslContext != null)
{
@@ -248,14 +258,14 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
}
catch(RuntimeException e)
{
- LOGGER.error("Error in Acceptor thread on port " + _config.getPort(), e);
+ LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e);
closeSocketIfNecessary(socket);
}
catch(IOException e)
{
if(!_closed)
{
- LOGGER.error("Error in Acceptor thread on port " + _config.getPort(), e);
+ LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e);
closeSocketIfNecessary(socket);
try
{
@@ -275,7 +285,7 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
{
if(LOGGER.isDebugEnabled())
{
- LOGGER.debug("Acceptor exiting, no new connections will be accepted on port " + _config.getPort());
+ LOGGER.debug("Acceptor exiting, no new connections will be accepted on address " + _config.getAddress());
}
}
}
@@ -294,6 +304,7 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet
}
}
}
+
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
index 7e63071c16..06a43e21c6 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
@@ -24,6 +24,7 @@ import org.apache.qpid.common.Closeable;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.Ticker;
import org.apache.qpid.transport.util.Logger;
import javax.net.ssl.SSLSocket;
@@ -31,6 +32,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.net.SocketException;
+import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -51,6 +53,8 @@ final class IoReceiver implements Runnable, Closeable
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Thread receiverThread;
private static final boolean shutdownBroken;
+
+ private Ticker _ticker;
static
{
String osName = System.getProperty("os.name");
@@ -136,7 +140,7 @@ final class IoReceiver implements Runnable, Closeable
{
final int threshold = bufferSize / 2;
- // I set the read buffer size simillar to SO_RCVBUF
+ // I set the read buffer size similar to SO_RCVBUF
// Haven't tested with a lower value to see if it's better or worse
byte[] buffer = new byte[bufferSize];
try
@@ -144,27 +148,71 @@ final class IoReceiver implements Runnable, Closeable
InputStream in = socket.getInputStream();
int read = 0;
int offset = 0;
- while ((read = in.read(buffer, offset, bufferSize-offset)) != -1)
+ long currentTime;
+ while(read != -1)
{
- if (read > 0)
+ try
+ {
+ while ((read = in.read(buffer, offset, bufferSize-offset)) != -1)
+ {
+ if (read > 0)
+ {
+ ByteBuffer b = ByteBuffer.wrap(buffer,offset,read);
+ receiver.received(b);
+ offset+=read;
+ if (offset > threshold)
+ {
+ offset = 0;
+ buffer = new byte[bufferSize];
+ }
+ }
+ currentTime = System.currentTimeMillis();
+
+ if(_ticker != null)
+ {
+ int tick = _ticker.getTimeToNextTick(currentTime);
+ if(tick <= 0)
+ {
+ tick = _ticker.tick(currentTime);
+ }
+ try
+ {
+ if(!socket.isClosed())
+ {
+ socket.setSoTimeout(tick <= 0 ? 1 : tick);
+ }
+ }
+ catch(SocketException e)
+ {
+ // ignore - closed socket
+ }
+ }
+ }
+ }
+ catch (SocketTimeoutException e)
{
- ByteBuffer b = ByteBuffer.wrap(buffer,offset,read);
- receiver.received(b);
- offset+=read;
- if (offset > threshold)
+ currentTime = System.currentTimeMillis();
+ if(_ticker != null)
{
- offset = 0;
- buffer = new byte[bufferSize];
+ final int tick = _ticker.tick(currentTime);
+ if(!socket.isClosed())
+ {
+ try
+ {
+ socket.setSoTimeout(tick <= 0 ? 1 : tick );
+ }
+ catch(SocketException ex)
+ {
+ // ignore - closed socket
+ }
+ }
}
}
}
}
catch (Throwable t)
{
- if (!(shutdownBroken &&
- t instanceof SocketException &&
- t.getMessage().equalsIgnoreCase("socket closed") &&
- closed.get()))
+ if (shouldReport(t))
{
receiver.exception(t);
}
@@ -183,4 +231,30 @@ final class IoReceiver implements Runnable, Closeable
}
}
+ private boolean shouldReport(Throwable t)
+ {
+ boolean brokenClose = closed.get() &&
+ shutdownBroken &&
+ t instanceof SocketException &&
+ "socket closed".equalsIgnoreCase(t.getMessage());
+
+ boolean sslSocketClosed = closed.get() &&
+ socket instanceof SSLSocket &&
+ t instanceof SocketException &&
+ "Socket is closed".equalsIgnoreCase(t.getMessage());
+
+ return !brokenClose && !sslSocketClosed;
+ }
+
+ public Ticker getTicker()
+ {
+ return _ticker;
+ }
+
+ public void setTicker(Ticker ticker)
+ {
+ _ticker = ticker;
+ }
+
+
}