diff options
Diffstat (limited to 'java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java')
-rw-r--r-- | java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java | 52 |
1 files changed, 39 insertions, 13 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java index a1fb0371fd..6144edb947 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java @@ -20,13 +20,15 @@ */ package org.apache.qpid.transport.network.io; +import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.util.Logger; -import java.io.InputStream; import java.io.IOException; +import java.io.InputStream; import java.net.Socket; +import java.net.SocketException; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; @@ -35,7 +37,7 @@ import java.util.concurrent.atomic.AtomicBoolean; * */ -final class IoReceiver extends Thread +final class IoReceiver implements Runnable { private static final Logger log = Logger.get(IoReceiver.class); @@ -46,6 +48,9 @@ final class IoReceiver extends Thread private final Socket socket; private final long timeout; private final AtomicBoolean closed = new AtomicBoolean(false); + private final Thread receiverThread; + private final boolean shutdownBroken = + ((String) System.getProperties().get("os.name")).matches("(?i).*windows.*"); public IoReceiver(IoTransport transport, Receiver<ByteBuffer> receiver, int bufferSize, long timeout) @@ -55,19 +60,27 @@ final class IoReceiver extends Thread this.bufferSize = bufferSize; this.socket = transport.getSocket(); this.timeout = timeout; - - setDaemon(true); - setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress())); - start(); + + try + { + receiverThread = Threading.getThreadFactory().createThread(this); + } + catch(Exception e) + { + throw new Error("Error creating IOReceiver thread",e); + } + receiverThread.setDaemon(true); + receiverThread.setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress())); + receiverThread.start(); } - void close() + void close(boolean block) { if (!closed.getAndSet(true)) { try { - if (((String) System.getProperties().get("os.name")).matches("(?i).*windows.*")) + if (shutdownBroken) { socket.close(); } @@ -75,10 +88,10 @@ final class IoReceiver extends Thread { socket.shutdownInput(); } - if (Thread.currentThread() != this) + if (block && Thread.currentThread() != receiverThread) { - join(timeout); - if (isAlive()) + receiverThread.join(timeout); + if (receiverThread.isAlive()) { throw new TransportException("join timed out"); } @@ -124,12 +137,25 @@ final class IoReceiver extends Thread } catch (Throwable t) { - receiver.exception(t); + if (!(shutdownBroken && + t instanceof SocketException && + t.getMessage().equalsIgnoreCase("socket closed") && + closed.get())) + { + receiver.exception(t); + } } finally { receiver.closed(); - transport.getSender().close(); + try + { + socket.close(); + } + catch(Exception e) + { + log.warn(e, "Error closing socket"); + } } } |