diff options
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java')
-rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java | 175 |
1 files changed, 90 insertions, 85 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index 66b97e8225..380e64c0f4 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -25,51 +25,49 @@ import java.io.OutputStream; import java.net.Socket; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.SenderException; import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.Transport; import org.apache.qpid.transport.util.Logger; - public final class IoSender implements Runnable, Sender<ByteBuffer> { - - private static final Logger log = Logger.get(IoSender.class); + private static final Logger _log = Logger.get(IoSender.class); // by starting here, we ensure that we always test the wraparound // case, we should probably make this configurable somehow so that // we can test other cases as well - private final static int START = Integer.MAX_VALUE - 10; - - private final IoContext ioCtx; - private final long timeout; - private final Socket socket; - private final OutputStream out; - - private final byte[] buffer; - private volatile int head = START; - private volatile int tail = START; - private volatile boolean idle = true; - private final Object notFull = new Object(); - private final Object notEmpty = new Object(); - private final AtomicBoolean closed = new AtomicBoolean(false); - private final Thread senderThread; - - private volatile Throwable exception = null; - - - public IoSender(IoContext ioCtx, int bufferSize, long timeout) + private static final int START = Integer.MAX_VALUE - 10; + private static final AtomicLong _id = new AtomicLong(0); + + private final long _timeout; + private final Socket _socket; + private final OutputStream _out; + + private final byte[] _buffer; + private volatile int _head = START; + private volatile int _tail = START; + private volatile boolean _idle = true; + private final Object _notFull = new Object(); + private final Object _notEmpty = new Object(); + private final AtomicBoolean _closed = new AtomicBoolean(false); + private final Thread _senderThread; + + private volatile Throwable _exception = null; + + public IoSender(Socket socket, int bufferSize, long timeout) { - this.ioCtx = ioCtx; - this.socket = ioCtx.getSocket(); - this.buffer = new byte[pof2(bufferSize)]; // buffer size must be a power of 2 - this.timeout = timeout; + _socket = socket; + _buffer = new byte[pof2(bufferSize)]; // buffer size must be a power of 2 + _timeout = timeout; try { - out = socket.getOutputStream(); + _out = socket.getOutputStream(); } catch (IOException e) { @@ -78,16 +76,16 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> try { - senderThread = Threading.getThreadFactory().createThread(this); + _senderThread = Threading.getThreadFactory().createThread(this); } catch(Exception e) { throw new Error("Error creating IOSender thread",e); } - senderThread.setDaemon(true); - senderThread.setName(String.format("IoSender - %s", socket.getRemoteSocketAddress())); - senderThread.start(); + _senderThread.setDaemon(true); + _senderThread.setName(String.format("IoSender-%d-%s", _id.getAndIncrement(), socket.getRemoteSocketAddress())); + _senderThread.start(); } private static final int pof2(int n) @@ -102,31 +100,31 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> public void send(ByteBuffer buf) { - if (closed.get()) + if (_closed.get()) { - throw new SenderException("sender is closed", exception); + throw new SenderException("sender is closed", _exception); } - final int size = buffer.length; + final int size = _buffer.length; int remaining = buf.remaining(); while (remaining > 0) { - final int hd = head; - final int tl = tail; + final int hd = _head; + final int tl = _tail; if (hd - tl >= size) { flush(); - synchronized (notFull) + synchronized (_notFull) { long start = System.currentTimeMillis(); long elapsed = 0; - while (!closed.get() && head - tail >= size && elapsed < timeout) + while (!_closed.get() && _head - _tail >= size && elapsed < _timeout) { try { - notFull.wait(timeout - elapsed); + _notFull.wait(_timeout - elapsed); } catch (InterruptedException e) { @@ -135,14 +133,14 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> elapsed += System.currentTimeMillis() - start; } - if (closed.get()) + if (_closed.get()) { - throw new SenderException("sender is closed", exception); + throw new SenderException("sender is closed", _exception); } - if (head - tail >= size) + if (_head - _tail >= size) { - throw new SenderException(String.format("write timed out: %s, %s", head, tail)); + throw new SenderException(String.format("write timed out: %s, %s", _head, _tail)); } } continue; @@ -161,90 +159,97 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> length = Math.min(size - hd_idx, remaining); } - buf.get(buffer, hd_idx, length); - head += length; + buf.get(_buffer, hd_idx, length); + _head += length; remaining -= length; } + flush(); } public void flush() { - if (idle) + if (_idle) { - synchronized (notEmpty) + synchronized (_notEmpty) { - notEmpty.notify(); + _notEmpty.notify(); } } } public void close() { - close(true); - } - - void close(boolean reportException) - { - if (!closed.getAndSet(true)) + if (!_closed.getAndSet(true)) { - synchronized (notFull) + synchronized (_notFull) { - notFull.notify(); + _notFull.notify(); } - synchronized (notEmpty) + synchronized (_notEmpty) { - notEmpty.notify(); + _notEmpty.notify(); } try { - if (Thread.currentThread() != senderThread) + if (Thread.currentThread() != _senderThread) { - senderThread.join(timeout); - if (senderThread.isAlive()) + if (Transport.WINDOWS) + { + _socket.close(); + } + else + { + _socket.shutdownOutput(); + } + _senderThread.join(_timeout); + if (_senderThread.isAlive()) { - throw new SenderException("join timed out"); + throw new SenderException("senderThread join timed out"); } } - ioCtx.getReceiver().close(false); } catch (InterruptedException e) { - throw new SenderException(e); + throw new SenderException("Close interrupted", e); + } + catch (IOException e) + { + throw new SenderException("IO error closing", e); } - if (reportException && exception != null) + if (_exception != null) { - throw new SenderException(exception); + throw new SenderException(_exception); } } } public void run() { - final int size = buffer.length; + final int size = _buffer.length; while (true) { - final int hd = head; - final int tl = tail; + final int hd = _head; + final int tl = _tail; if (hd == tl) { - if (closed.get()) + if (_closed.get()) { break; } - idle = true; + _idle = true; - synchronized (notEmpty) + synchronized (_notEmpty) { - while (head == tail && !closed.get()) + while (_head == _tail && !_closed.get()) { try { - notEmpty.wait(); + _notEmpty.wait(); } catch (InterruptedException e) { @@ -253,7 +258,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> } } - idle = false; + _idle = false; continue; } @@ -273,21 +278,21 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> try { - out.write(buffer, tl_idx, length); + _out.write(_buffer, tl_idx, length); } catch (IOException e) { - log.error(e, "error in write thread"); - exception = e; - close(false); + _log.error(e, "error in write thread"); + _exception = e; + close(); break; } - tail += length; - if (head - tl >= size) + _tail += length; + if (_head - tl >= size) { - synchronized (notFull) + synchronized (_notFull) { - notFull.notify(); + _notFull.notify(); } } } @@ -297,7 +302,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> { try { - socket.setSoTimeout(i); + _socket.setSoTimeout(i); } catch (Exception e) { |