summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2013-11-25 09:16:10 +0000
committerKeith Wall <kwall@apache.org>2013-11-25 09:16:10 +0000
commit0cc7671ff71f2163a7382b83f216d5366d7b8183 (patch)
tree5ccfe15ee6f15483a911ab9c165eb778b2a2ec7b
parent187a35a20459cdb9be1bb94309ce4a91c7b38572 (diff)
downloadqpid-python-0cc7671ff71f2163a7382b83f216d5366d7b8183.tar.gz
QPID-5282: Change IoSender to cause the socket to be closed after a sender timeout.
Merged from trunk with the following command: svn merge -c1543721 https://svn.apache.org/repos/asf/qpid/trunk/qpid git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.26@1545184 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java80
1 files changed, 52 insertions, 28 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
index a58fea47d2..17400cca8b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
@@ -60,6 +60,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Thread senderThread;
private final List<Closeable> _listeners = new ArrayList<Closeable>();
+ private final String _remoteSocketAddress;
private volatile Throwable exception = null;
@@ -68,6 +69,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
this.socket = socket;
this.buffer = new byte[pof2(bufferSize)]; // buffer size must be a power of 2
this.timeout = timeout;
+ _remoteSocketAddress = socket.getRemoteSocketAddress().toString();
try
{
@@ -89,7 +91,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
}
senderThread.setDaemon(true);
- senderThread.setName(String.format("IoSender - %s", socket.getRemoteSocketAddress()));
+ senderThread.setName(String.format("IoSender - %s", _remoteSocketAddress));
}
public void initiate()
@@ -109,13 +111,11 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
public void send(ByteBuffer buf)
{
- if (closed.get())
- {
- throw new SenderClosedException("sender is closed", exception);
- }
+ checkNotAlreadyClosed();
+
if(!senderThread.isAlive())
{
- throw new SenderException("sender thread not alive");
+ throw new SenderException(String.format("sender thread for socket %s is not alive", _remoteSocketAddress));
}
final int size = buffer.length;
@@ -131,7 +131,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
flush();
synchronized (notFull)
{
- long start = System.currentTimeMillis();
+ final long start = System.currentTimeMillis();
long elapsed = 0;
while (!closed.get() && head - tail >= size && elapsed < timeout)
{
@@ -146,14 +146,19 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
elapsed += System.currentTimeMillis() - start;
}
- if (closed.get())
- {
- throw new SenderClosedException("sender is closed", exception);
- }
+ checkNotAlreadyClosed();
if (head - tail >= size)
{
- throw new SenderException(String.format("write timed out: %s, %s", head, tail));
+ try
+ {
+ log.error("write timed out for socket %s: head %d, tail %d", _remoteSocketAddress, head, tail);
+ throw new SenderException(String.format("write timed out for socket %s: head %d, tail %d", _remoteSocketAddress, head, tail));
+ }
+ finally
+ {
+ close(false, false);
+ }
}
}
continue;
@@ -191,10 +196,10 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
public void close()
{
- close(true);
+ close(true, true);
}
- void close(boolean reportException)
+ private void close(boolean awaitSenderBeforeClose, boolean reportException)
{
if (!closed.getAndSet(true))
{
@@ -210,21 +215,11 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
try
{
- if (Thread.currentThread() != senderThread)
+ if (awaitSenderBeforeClose)
{
- senderThread.join(timeout);
- if (senderThread.isAlive())
- {
- log.error("join timed out");
- throw new SenderException("join timed out");
- }
+ awaitSenderThreadShutdown();
}
}
- catch (InterruptedException e)
- {
- log.error("interrupted whilst waiting for sender thread to stop");
- throw new SenderException(e);
- }
finally
{
closeListeners();
@@ -247,7 +242,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
}
catch(Exception e)
{
- log.error("Exception closing listener: " + e.getMessage());
+ log.error(e, "Exception closing listener for socket %s", _remoteSocketAddress);
ex = e;
}
}
@@ -316,7 +311,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
{
log.error(e, "error in write thread");
exception = e;
- close(false);
+ close(false, false);
break;
}
tail += length;
@@ -346,4 +341,33 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
{
_listeners.add(listener);
}
+
+ private void awaitSenderThreadShutdown()
+ {
+ if (Thread.currentThread() != senderThread)
+ {
+ try
+ {
+ senderThread.join(timeout);
+ if (senderThread.isAlive())
+ {
+ log.error("join timed out for socket %s to stop", _remoteSocketAddress);
+ throw new SenderException(String.format("join timed out for socket %s to stop", _remoteSocketAddress));
+ }
+ }
+ catch (InterruptedException e)
+ {
+ log.error("interrupted whilst waiting for sender thread for socket %s to stop", _remoteSocketAddress);
+ throw new SenderException(e);
+ }
+ }
+ }
+
+ private void checkNotAlreadyClosed()
+ {
+ if (closed.get())
+ {
+ throw new SenderClosedException(String.format("sender for socket %s is closed", _remoteSocketAddress), exception);
+ }
+ }
}