diff options
author | Keith Wall <kwall@apache.org> | 2013-11-20 06:59:44 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2013-11-20 06:59:44 +0000 |
commit | ec2f08ec82cc9f6b73d20117356dc826ad99a8b1 (patch) | |
tree | 618acec857e7f38a4a47526e7bd872ca6bac5e56 /java/common/src | |
parent | ae18ff9038f8018b82242e746a4c5d7b1e676c1e (diff) | |
download | qpid-python-ec2f08ec82cc9f6b73d20117356dc826ad99a8b1.tar.gz |
QPID-5282: Change IoSender to cause the socket to be closed after a sender timeout
IoSender#send now causes the socket to be closed in response to a sender timeout (in addition to the SenderException).
Note that this code path avoids the close causing the sender thread join (as this would most likely timeout too). Also
improved log/exception messages to include the remote socket address (to aid problem diagnosis).
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1543721 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src')
-rw-r--r-- | java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java | 80 |
1 files changed, 52 insertions, 28 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index a58fea47d2..17400cca8b 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -60,6 +60,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> private final AtomicBoolean closed = new AtomicBoolean(false); private final Thread senderThread; private final List<Closeable> _listeners = new ArrayList<Closeable>(); + private final String _remoteSocketAddress; private volatile Throwable exception = null; @@ -68,6 +69,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> this.socket = socket; this.buffer = new byte[pof2(bufferSize)]; // buffer size must be a power of 2 this.timeout = timeout; + _remoteSocketAddress = socket.getRemoteSocketAddress().toString(); try { @@ -89,7 +91,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> } senderThread.setDaemon(true); - senderThread.setName(String.format("IoSender - %s", socket.getRemoteSocketAddress())); + senderThread.setName(String.format("IoSender - %s", _remoteSocketAddress)); } public void initiate() @@ -109,13 +111,11 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> public void send(ByteBuffer buf) { - if (closed.get()) - { - throw new SenderClosedException("sender is closed", exception); - } + checkNotAlreadyClosed(); + if(!senderThread.isAlive()) { - throw new SenderException("sender thread not alive"); + throw new SenderException(String.format("sender thread for socket %s is not alive", _remoteSocketAddress)); } final int size = buffer.length; @@ -131,7 +131,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> flush(); synchronized (notFull) { - long start = System.currentTimeMillis(); + final long start = System.currentTimeMillis(); long elapsed = 0; while (!closed.get() && head - tail >= size && elapsed < timeout) { @@ -146,14 +146,19 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> elapsed += System.currentTimeMillis() - start; } - if (closed.get()) - { - throw new SenderClosedException("sender is closed", exception); - } + checkNotAlreadyClosed(); if (head - tail >= size) { - throw new SenderException(String.format("write timed out: %s, %s", head, tail)); + try + { + log.error("write timed out for socket %s: head %d, tail %d", _remoteSocketAddress, head, tail); + throw new SenderException(String.format("write timed out for socket %s: head %d, tail %d", _remoteSocketAddress, head, tail)); + } + finally + { + close(false, false); + } } } continue; @@ -191,10 +196,10 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> public void close() { - close(true); + close(true, true); } - void close(boolean reportException) + private void close(boolean awaitSenderBeforeClose, boolean reportException) { if (!closed.getAndSet(true)) { @@ -210,21 +215,11 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> try { - if (Thread.currentThread() != senderThread) + if (awaitSenderBeforeClose) { - senderThread.join(timeout); - if (senderThread.isAlive()) - { - log.error("join timed out"); - throw new SenderException("join timed out"); - } + awaitSenderThreadShutdown(); } } - catch (InterruptedException e) - { - log.error("interrupted whilst waiting for sender thread to stop"); - throw new SenderException(e); - } finally { closeListeners(); @@ -247,7 +242,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> } catch(Exception e) { - log.error("Exception closing listener: " + e.getMessage()); + log.error(e, "Exception closing listener for socket %s", _remoteSocketAddress); ex = e; } } @@ -316,7 +311,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> { log.error(e, "error in write thread"); exception = e; - close(false); + close(false, false); break; } tail += length; @@ -346,4 +341,33 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> { _listeners.add(listener); } + + private void awaitSenderThreadShutdown() + { + if (Thread.currentThread() != senderThread) + { + try + { + senderThread.join(timeout); + if (senderThread.isAlive()) + { + log.error("join timed out for socket %s to stop", _remoteSocketAddress); + throw new SenderException(String.format("join timed out for socket %s to stop", _remoteSocketAddress)); + } + } + catch (InterruptedException e) + { + log.error("interrupted whilst waiting for sender thread for socket %s to stop", _remoteSocketAddress); + throw new SenderException(e); + } + } + } + + private void checkNotAlreadyClosed() + { + if (closed.get()) + { + throw new SenderClosedException(String.format("sender for socket %s is closed", _remoteSocketAddress), exception); + } + } } |