summaryrefslogtreecommitdiff
path: root/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java')
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java100
1 files changed, 87 insertions, 13 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
index 7e63071c16..06a43e21c6 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
@@ -24,6 +24,7 @@ import org.apache.qpid.common.Closeable;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.Ticker;
import org.apache.qpid.transport.util.Logger;
import javax.net.ssl.SSLSocket;
@@ -31,6 +32,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.net.SocketException;
+import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -51,6 +53,8 @@ final class IoReceiver implements Runnable, Closeable
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Thread receiverThread;
private static final boolean shutdownBroken;
+
+ private Ticker _ticker;
static
{
String osName = System.getProperty("os.name");
@@ -136,7 +140,7 @@ final class IoReceiver implements Runnable, Closeable
{
final int threshold = bufferSize / 2;
- // I set the read buffer size simillar to SO_RCVBUF
+ // I set the read buffer size similar to SO_RCVBUF
// Haven't tested with a lower value to see if it's better or worse
byte[] buffer = new byte[bufferSize];
try
@@ -144,27 +148,71 @@ final class IoReceiver implements Runnable, Closeable
InputStream in = socket.getInputStream();
int read = 0;
int offset = 0;
- while ((read = in.read(buffer, offset, bufferSize-offset)) != -1)
+ long currentTime;
+ while(read != -1)
{
- if (read > 0)
+ try
+ {
+ while ((read = in.read(buffer, offset, bufferSize-offset)) != -1)
+ {
+ if (read > 0)
+ {
+ ByteBuffer b = ByteBuffer.wrap(buffer,offset,read);
+ receiver.received(b);
+ offset+=read;
+ if (offset > threshold)
+ {
+ offset = 0;
+ buffer = new byte[bufferSize];
+ }
+ }
+ currentTime = System.currentTimeMillis();
+
+ if(_ticker != null)
+ {
+ int tick = _ticker.getTimeToNextTick(currentTime);
+ if(tick <= 0)
+ {
+ tick = _ticker.tick(currentTime);
+ }
+ try
+ {
+ if(!socket.isClosed())
+ {
+ socket.setSoTimeout(tick <= 0 ? 1 : tick);
+ }
+ }
+ catch(SocketException e)
+ {
+ // ignore - closed socket
+ }
+ }
+ }
+ }
+ catch (SocketTimeoutException e)
{
- ByteBuffer b = ByteBuffer.wrap(buffer,offset,read);
- receiver.received(b);
- offset+=read;
- if (offset > threshold)
+ currentTime = System.currentTimeMillis();
+ if(_ticker != null)
{
- offset = 0;
- buffer = new byte[bufferSize];
+ final int tick = _ticker.tick(currentTime);
+ if(!socket.isClosed())
+ {
+ try
+ {
+ socket.setSoTimeout(tick <= 0 ? 1 : tick );
+ }
+ catch(SocketException ex)
+ {
+ // ignore - closed socket
+ }
+ }
}
}
}
}
catch (Throwable t)
{
- if (!(shutdownBroken &&
- t instanceof SocketException &&
- t.getMessage().equalsIgnoreCase("socket closed") &&
- closed.get()))
+ if (shouldReport(t))
{
receiver.exception(t);
}
@@ -183,4 +231,30 @@ final class IoReceiver implements Runnable, Closeable
}
}
+ private boolean shouldReport(Throwable t)
+ {
+ boolean brokenClose = closed.get() &&
+ shutdownBroken &&
+ t instanceof SocketException &&
+ "socket closed".equalsIgnoreCase(t.getMessage());
+
+ boolean sslSocketClosed = closed.get() &&
+ socket instanceof SSLSocket &&
+ t instanceof SocketException &&
+ "Socket is closed".equalsIgnoreCase(t.getMessage());
+
+ return !brokenClose && !sslSocketClosed;
+ }
+
+ public Ticker getTicker()
+ {
+ return _ticker;
+ }
+
+ public void setTicker(Ticker ticker)
+ {
+ _ticker = ticker;
+ }
+
+
}