summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java175
1 files changed, 90 insertions, 85 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
index 66b97e8225..380e64c0f4 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
@@ -25,51 +25,49 @@ import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.Transport;
import org.apache.qpid.transport.util.Logger;
-
public final class IoSender implements Runnable, Sender<ByteBuffer>
{
-
- private static final Logger log = Logger.get(IoSender.class);
+ private static final Logger _log = Logger.get(IoSender.class);
// by starting here, we ensure that we always test the wraparound
// case, we should probably make this configurable somehow so that
// we can test other cases as well
- private final static int START = Integer.MAX_VALUE - 10;
-
- private final IoContext ioCtx;
- private final long timeout;
- private final Socket socket;
- private final OutputStream out;
-
- private final byte[] buffer;
- private volatile int head = START;
- private volatile int tail = START;
- private volatile boolean idle = true;
- private final Object notFull = new Object();
- private final Object notEmpty = new Object();
- private final AtomicBoolean closed = new AtomicBoolean(false);
- private final Thread senderThread;
-
- private volatile Throwable exception = null;
-
-
- public IoSender(IoContext ioCtx, int bufferSize, long timeout)
+ private static final int START = Integer.MAX_VALUE - 10;
+ private static final AtomicLong _id = new AtomicLong(0);
+
+ private final long _timeout;
+ private final Socket _socket;
+ private final OutputStream _out;
+
+ private final byte[] _buffer;
+ private volatile int _head = START;
+ private volatile int _tail = START;
+ private volatile boolean _idle = true;
+ private final Object _notFull = new Object();
+ private final Object _notEmpty = new Object();
+ private final AtomicBoolean _closed = new AtomicBoolean(false);
+ private final Thread _senderThread;
+
+ private volatile Throwable _exception = null;
+
+ public IoSender(Socket socket, int bufferSize, long timeout)
{
- this.ioCtx = ioCtx;
- this.socket = ioCtx.getSocket();
- this.buffer = new byte[pof2(bufferSize)]; // buffer size must be a power of 2
- this.timeout = timeout;
+ _socket = socket;
+ _buffer = new byte[pof2(bufferSize)]; // buffer size must be a power of 2
+ _timeout = timeout;
try
{
- out = socket.getOutputStream();
+ _out = socket.getOutputStream();
}
catch (IOException e)
{
@@ -78,16 +76,16 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
try
{
- senderThread = Threading.getThreadFactory().createThread(this);
+ _senderThread = Threading.getThreadFactory().createThread(this);
}
catch(Exception e)
{
throw new Error("Error creating IOSender thread",e);
}
- senderThread.setDaemon(true);
- senderThread.setName(String.format("IoSender - %s", socket.getRemoteSocketAddress()));
- senderThread.start();
+ _senderThread.setDaemon(true);
+ _senderThread.setName(String.format("IoSender-%d-%s", _id.getAndIncrement(), socket.getRemoteSocketAddress()));
+ _senderThread.start();
}
private static final int pof2(int n)
@@ -102,31 +100,31 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
public void send(ByteBuffer buf)
{
- if (closed.get())
+ if (_closed.get())
{
- throw new SenderException("sender is closed", exception);
+ throw new SenderException("sender is closed", _exception);
}
- final int size = buffer.length;
+ final int size = _buffer.length;
int remaining = buf.remaining();
while (remaining > 0)
{
- final int hd = head;
- final int tl = tail;
+ final int hd = _head;
+ final int tl = _tail;
if (hd - tl >= size)
{
flush();
- synchronized (notFull)
+ synchronized (_notFull)
{
long start = System.currentTimeMillis();
long elapsed = 0;
- while (!closed.get() && head - tail >= size && elapsed < timeout)
+ while (!_closed.get() && _head - _tail >= size && elapsed < _timeout)
{
try
{
- notFull.wait(timeout - elapsed);
+ _notFull.wait(_timeout - elapsed);
}
catch (InterruptedException e)
{
@@ -135,14 +133,14 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
elapsed += System.currentTimeMillis() - start;
}
- if (closed.get())
+ if (_closed.get())
{
- throw new SenderException("sender is closed", exception);
+ throw new SenderException("sender is closed", _exception);
}
- if (head - tail >= size)
+ if (_head - _tail >= size)
{
- throw new SenderException(String.format("write timed out: %s, %s", head, tail));
+ throw new SenderException(String.format("write timed out: %s, %s", _head, _tail));
}
}
continue;
@@ -161,90 +159,97 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
length = Math.min(size - hd_idx, remaining);
}
- buf.get(buffer, hd_idx, length);
- head += length;
+ buf.get(_buffer, hd_idx, length);
+ _head += length;
remaining -= length;
}
+ flush();
}
public void flush()
{
- if (idle)
+ if (_idle)
{
- synchronized (notEmpty)
+ synchronized (_notEmpty)
{
- notEmpty.notify();
+ _notEmpty.notify();
}
}
}
public void close()
{
- close(true);
- }
-
- void close(boolean reportException)
- {
- if (!closed.getAndSet(true))
+ if (!_closed.getAndSet(true))
{
- synchronized (notFull)
+ synchronized (_notFull)
{
- notFull.notify();
+ _notFull.notify();
}
- synchronized (notEmpty)
+ synchronized (_notEmpty)
{
- notEmpty.notify();
+ _notEmpty.notify();
}
try
{
- if (Thread.currentThread() != senderThread)
+ if (Thread.currentThread() != _senderThread)
{
- senderThread.join(timeout);
- if (senderThread.isAlive())
+ if (Transport.WINDOWS)
+ {
+ _socket.close();
+ }
+ else
+ {
+ _socket.shutdownOutput();
+ }
+ _senderThread.join(_timeout);
+ if (_senderThread.isAlive())
{
- throw new SenderException("join timed out");
+ throw new SenderException("senderThread join timed out");
}
}
- ioCtx.getReceiver().close(false);
}
catch (InterruptedException e)
{
- throw new SenderException(e);
+ throw new SenderException("Close interrupted", e);
+ }
+ catch (IOException e)
+ {
+ throw new SenderException("IO error closing", e);
}
- if (reportException && exception != null)
+ if (_exception != null)
{
- throw new SenderException(exception);
+ throw new SenderException(_exception);
}
}
}
public void run()
{
- final int size = buffer.length;
+ final int size = _buffer.length;
while (true)
{
- final int hd = head;
- final int tl = tail;
+ final int hd = _head;
+ final int tl = _tail;
if (hd == tl)
{
- if (closed.get())
+ if (_closed.get())
{
break;
}
- idle = true;
+ _idle = true;
- synchronized (notEmpty)
+ synchronized (_notEmpty)
{
- while (head == tail && !closed.get())
+ while (_head == _tail && !_closed.get())
{
try
{
- notEmpty.wait();
+ _notEmpty.wait();
}
catch (InterruptedException e)
{
@@ -253,7 +258,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
}
}
- idle = false;
+ _idle = false;
continue;
}
@@ -273,21 +278,21 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
try
{
- out.write(buffer, tl_idx, length);
+ _out.write(_buffer, tl_idx, length);
}
catch (IOException e)
{
- log.error(e, "error in write thread");
- exception = e;
- close(false);
+ _log.error(e, "error in write thread");
+ _exception = e;
+ close();
break;
}
- tail += length;
- if (head - tl >= size)
+ _tail += length;
+ if (_head - tl >= size)
{
- synchronized (notFull)
+ synchronized (_notFull)
{
- notFull.notify();
+ _notFull.notify();
}
}
}
@@ -297,7 +302,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
{
try
{
- socket.setSoTimeout(i);
+ _socket.setSoTimeout(i);
}
catch (Exception e)
{