diff options
author | Keith Wall <kwall@apache.org> | 2013-11-25 09:16:10 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2013-11-25 09:16:10 +0000 |
commit | 0cc7671ff71f2163a7382b83f216d5366d7b8183 (patch) | |
tree | 5ccfe15ee6f15483a911ab9c165eb778b2a2ec7b | |
parent | 187a35a20459cdb9be1bb94309ce4a91c7b38572 (diff) | |
download | qpid-python-0cc7671ff71f2163a7382b83f216d5366d7b8183.tar.gz |
QPID-5282: Change IoSender to cause the socket to be closed after a sender timeout.
Merged from trunk with the following command:
svn merge -c1543721 https://svn.apache.org/repos/asf/qpid/trunk/qpid
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.26@1545184 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java | 80 |
1 files changed, 52 insertions, 28 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index a58fea47d2..17400cca8b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -60,6 +60,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> private final AtomicBoolean closed = new AtomicBoolean(false); private final Thread senderThread; private final List<Closeable> _listeners = new ArrayList<Closeable>(); + private final String _remoteSocketAddress; private volatile Throwable exception = null; @@ -68,6 +69,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> this.socket = socket; this.buffer = new byte[pof2(bufferSize)]; // buffer size must be a power of 2 this.timeout = timeout; + _remoteSocketAddress = socket.getRemoteSocketAddress().toString(); try { @@ -89,7 +91,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> } senderThread.setDaemon(true); - senderThread.setName(String.format("IoSender - %s", socket.getRemoteSocketAddress())); + senderThread.setName(String.format("IoSender - %s", _remoteSocketAddress)); } public void initiate() @@ -109,13 +111,11 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> public void send(ByteBuffer buf) { - if (closed.get()) - { - throw new SenderClosedException("sender is closed", exception); - } + checkNotAlreadyClosed(); + if(!senderThread.isAlive()) { - throw new SenderException("sender thread not alive"); + throw new SenderException(String.format("sender thread for socket %s is not alive", _remoteSocketAddress)); } final int size = buffer.length; @@ -131,7 +131,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> flush(); synchronized (notFull) { - long start = System.currentTimeMillis(); + final long start = System.currentTimeMillis(); long elapsed = 0; while (!closed.get() && head - tail >= size && elapsed < timeout) { @@ -146,14 +146,19 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> elapsed += System.currentTimeMillis() - start; } - if (closed.get()) - { - throw new SenderClosedException("sender is closed", exception); - } + checkNotAlreadyClosed(); if (head - tail >= size) { - throw new SenderException(String.format("write timed out: %s, %s", head, tail)); + try + { + log.error("write timed out for socket %s: head %d, tail %d", _remoteSocketAddress, head, tail); + throw new SenderException(String.format("write timed out for socket %s: head %d, tail %d", _remoteSocketAddress, head, tail)); + } + finally + { + close(false, false); + } } } continue; @@ -191,10 +196,10 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> public void close() { - close(true); + close(true, true); } - void close(boolean reportException) + private void close(boolean awaitSenderBeforeClose, boolean reportException) { if (!closed.getAndSet(true)) { @@ -210,21 +215,11 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> try { - if (Thread.currentThread() != senderThread) + if (awaitSenderBeforeClose) { - senderThread.join(timeout); - if (senderThread.isAlive()) - { - log.error("join timed out"); - throw new SenderException("join timed out"); - } + awaitSenderThreadShutdown(); } } - catch (InterruptedException e) - { - log.error("interrupted whilst waiting for sender thread to stop"); - throw new SenderException(e); - } finally { closeListeners(); @@ -247,7 +242,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> } catch(Exception e) { - log.error("Exception closing listener: " + e.getMessage()); + log.error(e, "Exception closing listener for socket %s", _remoteSocketAddress); ex = e; } } @@ -316,7 +311,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> { log.error(e, "error in write thread"); exception = e; - close(false); + close(false, false); break; } tail += length; @@ -346,4 +341,33 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> { _listeners.add(listener); } + + private void awaitSenderThreadShutdown() + { + if (Thread.currentThread() != senderThread) + { + try + { + senderThread.join(timeout); + if (senderThread.isAlive()) + { + log.error("join timed out for socket %s to stop", _remoteSocketAddress); + throw new SenderException(String.format("join timed out for socket %s to stop", _remoteSocketAddress)); + } + } + catch (InterruptedException e) + { + log.error("interrupted whilst waiting for sender thread for socket %s to stop", _remoteSocketAddress); + throw new SenderException(e); + } + } + } + + private void checkNotAlreadyClosed() + { + if (closed.get()) + { + throw new SenderClosedException(String.format("sender for socket %s is closed", _remoteSocketAddress), exception); + } + } } |