summaryrefslogtreecommitdiff
path: root/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java')
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java68
1 files changed, 13 insertions, 55 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
index 427487c879..66b97e8225 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
@@ -24,14 +24,10 @@ import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.qpid.common.Closeable;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.SenderClosedException;
import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.util.Logger;
@@ -47,6 +43,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
// we can test other cases as well
private final static int START = Integer.MAX_VALUE - 10;
+ private final IoContext ioCtx;
private final long timeout;
private final Socket socket;
private final OutputStream out;
@@ -59,13 +56,14 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
private final Object notEmpty = new Object();
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Thread senderThread;
- private final List<Closeable> _listeners = new ArrayList<Closeable>();
-
+
private volatile Throwable exception = null;
- public IoSender(Socket socket, int bufferSize, long timeout)
+
+ public IoSender(IoContext ioCtx, int bufferSize, long timeout)
{
- this.socket = socket;
+ this.ioCtx = ioCtx;
+ this.socket = ioCtx.getSocket();
this.buffer = new byte[pof2(bufferSize)]; // buffer size must be a power of 2
this.timeout = timeout;
@@ -80,20 +78,15 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
try
{
- //Create but deliberately don't start the thread.
- senderThread = Threading.getThreadFactory().createThread(this);
+ senderThread = Threading.getThreadFactory().createThread(this);
}
catch(Exception e)
{
throw new Error("Error creating IOSender thread",e);
}
-
+
senderThread.setDaemon(true);
senderThread.setName(String.format("IoSender - %s", socket.getRemoteSocketAddress()));
- }
-
- public void initiate()
- {
senderThread.start();
}
@@ -111,11 +104,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
{
if (closed.get())
{
- throw new SenderClosedException("sender is closed", exception);
- }
- if(!senderThread.isAlive())
- {
- throw new SenderException("sender thread not alive");
+ throw new SenderException("sender is closed", exception);
}
final int size = buffer.length;
@@ -148,7 +137,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
if (closed.get())
{
- throw new SenderClosedException("sender is closed", exception);
+ throw new SenderException("sender is closed", exception);
}
if (head - tail >= size)
@@ -215,20 +204,16 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
senderThread.join(timeout);
if (senderThread.isAlive())
{
- log.error("join timed out");
throw new SenderException("join timed out");
}
}
+ ioCtx.getReceiver().close(false);
}
catch (InterruptedException e)
{
- log.error("interrupted whilst waiting for sender thread to stop");
throw new SenderException(e);
}
- finally
- {
- closeListeners();
- }
+
if (reportException && exception != null)
{
throw new SenderException(exception);
@@ -236,31 +221,9 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
}
}
- private void closeListeners()
- {
- Exception ex = null;
- for(Closeable listener : _listeners)
- {
- try
- {
- listener.close();
- }
- catch(Exception e)
- {
- log.error("Exception closing listener: " + e.getMessage());
- ex = e;
- }
- }
-
- if (ex != null)
- {
- throw new SenderException(ex.getMessage(), ex);
- }
- }
-
public void run()
{
- final int size = buffer.length;
+ final int size = buffer.length;
while (true)
{
final int hd = head;
@@ -341,9 +304,4 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
throw new SenderException(e);
}
}
-
- public void registerCloseListener(Closeable listener)
- {
- _listeners.add(listener);
- }
}