summaryrefslogtreecommitdiff
path: root/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java')
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java52
1 files changed, 39 insertions, 13 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
index a1fb0371fd..6144edb947 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
@@ -20,13 +20,15 @@
*/
package org.apache.qpid.transport.network.io;
+import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.util.Logger;
-import java.io.InputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.net.Socket;
+import java.net.SocketException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -35,7 +37,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
*
*/
-final class IoReceiver extends Thread
+final class IoReceiver implements Runnable
{
private static final Logger log = Logger.get(IoReceiver.class);
@@ -46,6 +48,9 @@ final class IoReceiver extends Thread
private final Socket socket;
private final long timeout;
private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final Thread receiverThread;
+ private final boolean shutdownBroken =
+ ((String) System.getProperties().get("os.name")).matches("(?i).*windows.*");
public IoReceiver(IoTransport transport, Receiver<ByteBuffer> receiver,
int bufferSize, long timeout)
@@ -55,19 +60,27 @@ final class IoReceiver extends Thread
this.bufferSize = bufferSize;
this.socket = transport.getSocket();
this.timeout = timeout;
-
- setDaemon(true);
- setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress()));
- start();
+
+ try
+ {
+ receiverThread = Threading.getThreadFactory().createThread(this);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating IOReceiver thread",e);
+ }
+ receiverThread.setDaemon(true);
+ receiverThread.setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress()));
+ receiverThread.start();
}
- void close()
+ void close(boolean block)
{
if (!closed.getAndSet(true))
{
try
{
- if (((String) System.getProperties().get("os.name")).matches("(?i).*windows.*"))
+ if (shutdownBroken)
{
socket.close();
}
@@ -75,10 +88,10 @@ final class IoReceiver extends Thread
{
socket.shutdownInput();
}
- if (Thread.currentThread() != this)
+ if (block && Thread.currentThread() != receiverThread)
{
- join(timeout);
- if (isAlive())
+ receiverThread.join(timeout);
+ if (receiverThread.isAlive())
{
throw new TransportException("join timed out");
}
@@ -124,12 +137,25 @@ final class IoReceiver extends Thread
}
catch (Throwable t)
{
- receiver.exception(t);
+ if (!(shutdownBroken &&
+ t instanceof SocketException &&
+ t.getMessage().equalsIgnoreCase("socket closed") &&
+ closed.get()))
+ {
+ receiver.exception(t);
+ }
}
finally
{
receiver.closed();
- transport.getSender().close();
+ try
+ {
+ socket.close();
+ }
+ catch(Exception e)
+ {
+ log.warn(e, "Error closing socket");
+ }
}
}