diff options
Diffstat (limited to 'java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java')
-rw-r--r-- | java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java | 68 |
1 files changed, 13 insertions, 55 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index 427487c879..66b97e8225 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -24,14 +24,10 @@ import java.io.IOException; import java.io.OutputStream; import java.net.Socket; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.qpid.common.Closeable; import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.SenderClosedException; import org.apache.qpid.transport.SenderException; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.util.Logger; @@ -47,6 +43,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> // we can test other cases as well private final static int START = Integer.MAX_VALUE - 10; + private final IoContext ioCtx; private final long timeout; private final Socket socket; private final OutputStream out; @@ -59,13 +56,14 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> private final Object notEmpty = new Object(); private final AtomicBoolean closed = new AtomicBoolean(false); private final Thread senderThread; - private final List<Closeable> _listeners = new ArrayList<Closeable>(); - + private volatile Throwable exception = null; - public IoSender(Socket socket, int bufferSize, long timeout) + + public IoSender(IoContext ioCtx, int bufferSize, long timeout) { - this.socket = socket; + this.ioCtx = ioCtx; + this.socket = ioCtx.getSocket(); this.buffer = new byte[pof2(bufferSize)]; // buffer size must be a power of 2 this.timeout = timeout; @@ -80,20 +78,15 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> try { - //Create but deliberately don't start the thread. - senderThread = Threading.getThreadFactory().createThread(this); + senderThread = Threading.getThreadFactory().createThread(this); } catch(Exception e) { throw new Error("Error creating IOSender thread",e); } - + senderThread.setDaemon(true); senderThread.setName(String.format("IoSender - %s", socket.getRemoteSocketAddress())); - } - - public void initiate() - { senderThread.start(); } @@ -111,11 +104,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> { if (closed.get()) { - throw new SenderClosedException("sender is closed", exception); - } - if(!senderThread.isAlive()) - { - throw new SenderException("sender thread not alive"); + throw new SenderException("sender is closed", exception); } final int size = buffer.length; @@ -148,7 +137,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> if (closed.get()) { - throw new SenderClosedException("sender is closed", exception); + throw new SenderException("sender is closed", exception); } if (head - tail >= size) @@ -215,20 +204,16 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> senderThread.join(timeout); if (senderThread.isAlive()) { - log.error("join timed out"); throw new SenderException("join timed out"); } } + ioCtx.getReceiver().close(false); } catch (InterruptedException e) { - log.error("interrupted whilst waiting for sender thread to stop"); throw new SenderException(e); } - finally - { - closeListeners(); - } + if (reportException && exception != null) { throw new SenderException(exception); @@ -236,31 +221,9 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> } } - private void closeListeners() - { - Exception ex = null; - for(Closeable listener : _listeners) - { - try - { - listener.close(); - } - catch(Exception e) - { - log.error("Exception closing listener: " + e.getMessage()); - ex = e; - } - } - - if (ex != null) - { - throw new SenderException(ex.getMessage(), ex); - } - } - public void run() { - final int size = buffer.length; + final int size = buffer.length; while (true) { final int hd = head; @@ -341,9 +304,4 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> throw new SenderException(e); } } - - public void registerCloseListener(Closeable listener) - { - _listeners.add(listener); - } } |