diff options
Diffstat (limited to 'java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java')
-rw-r--r-- | java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java | 100 |
1 files changed, 87 insertions, 13 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java index 7e63071c16..06a43e21c6 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java @@ -24,6 +24,7 @@ import org.apache.qpid.common.Closeable; import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.Ticker; import org.apache.qpid.transport.util.Logger; import javax.net.ssl.SSLSocket; @@ -31,6 +32,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.Socket; import java.net.SocketException; +import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; @@ -51,6 +53,8 @@ final class IoReceiver implements Runnable, Closeable private final AtomicBoolean closed = new AtomicBoolean(false); private final Thread receiverThread; private static final boolean shutdownBroken; + + private Ticker _ticker; static { String osName = System.getProperty("os.name"); @@ -136,7 +140,7 @@ final class IoReceiver implements Runnable, Closeable { final int threshold = bufferSize / 2; - // I set the read buffer size simillar to SO_RCVBUF + // I set the read buffer size similar to SO_RCVBUF // Haven't tested with a lower value to see if it's better or worse byte[] buffer = new byte[bufferSize]; try @@ -144,27 +148,71 @@ final class IoReceiver implements Runnable, Closeable InputStream in = socket.getInputStream(); int read = 0; int offset = 0; - while ((read = in.read(buffer, offset, bufferSize-offset)) != -1) + long currentTime; + while(read != -1) { - if (read > 0) + try + { + while ((read = in.read(buffer, offset, bufferSize-offset)) != -1) + { + if (read > 0) + { + ByteBuffer b = ByteBuffer.wrap(buffer,offset,read); + receiver.received(b); + offset+=read; + if (offset > threshold) + { + offset = 0; + buffer = new byte[bufferSize]; + } + } + currentTime = System.currentTimeMillis(); + + if(_ticker != null) + { + int tick = _ticker.getTimeToNextTick(currentTime); + if(tick <= 0) + { + tick = _ticker.tick(currentTime); + } + try + { + if(!socket.isClosed()) + { + socket.setSoTimeout(tick <= 0 ? 1 : tick); + } + } + catch(SocketException e) + { + // ignore - closed socket + } + } + } + } + catch (SocketTimeoutException e) { - ByteBuffer b = ByteBuffer.wrap(buffer,offset,read); - receiver.received(b); - offset+=read; - if (offset > threshold) + currentTime = System.currentTimeMillis(); + if(_ticker != null) { - offset = 0; - buffer = new byte[bufferSize]; + final int tick = _ticker.tick(currentTime); + if(!socket.isClosed()) + { + try + { + socket.setSoTimeout(tick <= 0 ? 1 : tick ); + } + catch(SocketException ex) + { + // ignore - closed socket + } + } } } } } catch (Throwable t) { - if (!(shutdownBroken && - t instanceof SocketException && - t.getMessage().equalsIgnoreCase("socket closed") && - closed.get())) + if (shouldReport(t)) { receiver.exception(t); } @@ -183,4 +231,30 @@ final class IoReceiver implements Runnable, Closeable } } + private boolean shouldReport(Throwable t) + { + boolean brokenClose = closed.get() && + shutdownBroken && + t instanceof SocketException && + "socket closed".equalsIgnoreCase(t.getMessage()); + + boolean sslSocketClosed = closed.get() && + socket instanceof SSLSocket && + t instanceof SocketException && + "Socket is closed".equalsIgnoreCase(t.getMessage()); + + return !brokenClose && !sslSocketClosed; + } + + public Ticker getTicker() + { + return _ticker; + } + + public void setTicker(Ticker ticker) + { + _ticker = ticker; + } + + } |