From 0717d779e443044d090d72ab5176d630c4eb5f6e Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Sun, 25 Mar 2012 00:15:56 +0000 Subject: QPID-3903: Session#close() should not wait forever if broker fails to respond to channel close (0-8..0-9-1 protocols) git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1304971 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/client/protocol/AMQProtocolHandler.java | 3 +- .../apache/qpid/client/util/BlockingWaiter.java | 39 ++++++++++------------ 2 files changed, 20 insertions(+), 22 deletions(-) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index d380402da7..b314453e31 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -652,7 +652,8 @@ public class AMQProtocolHandler implements ProtocolEngine } writeFrame(frame); - return listener.blockForFrame(timeout); + long actualTimeout = timeout == -1 ? DEFAULT_SYNC_TIMEOUT : timeout; + return listener.blockForFrame(actualTimeout); // When control resumes before this line, a reply will have been received // that matches the criteria defined in the blocking listener } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java index 80d171592f..22dc17e53c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java @@ -39,7 +39,7 @@ import java.util.concurrent.locks.ReentrantLock; * differs from a 'rendezvous' in that sense. * *

BlockingWaiters are used to coordinate when waiting for an an event that expect a response. - * They are always used in a 'one-shot' manner, that is, to recieve just one response. Usually the caller has to register + * They are always used in a 'one-shot' manner, that is, to receive just one response. Usually the caller has to register * them as method listeners with an event dispatcher and remember to de-register them (in a finally block) once they * have been completed. * @@ -51,12 +51,12 @@ import java.util.concurrent.locks.ReentrantLock; *

*
CRC Card
Responsibilities Collaborations *
Accept generic objects as events for processing via {@link #process}. - *
Delegate handling and undserstanding of the object to a concrete implementation. + *
Delegate handling and understanding of the object to a concrete implementation. *
Block until {@link #process} determines that waiting is no longer required *
Propagate the most recent exception to the consumer. *
* - * @todo Interuption is caught but not handled. This could be allowed to fall through. This might actually be usefull + * @todo Interruption is caught but not handled. This could be allowed to fall through. This might actually be useful * for fail-over where a thread is blocking when failure happens, it could be interrupted to abandon or retry * when this happens. At the very least, restore the interrupted status flag. * @todo If the retrotranslator can handle it, could use a SynchronousQueue to implement this rendezvous. Need to @@ -84,13 +84,13 @@ public abstract class BlockingWaiter /** Used to hold the most recent exception that is passed to the {@link #error(Exception)} method. */ private volatile Exception _error; - /** Holds the incomming Object. */ + /** Holds the incoming Object. */ private Object _doneObject = null; private AtomicBoolean _waiting = new AtomicBoolean(false); private boolean _closed = false; /** - * Delegates processing of the incomming object to the handler. + * Delegates processing of the incoming object to the handler. * * @param object The object to process. * @@ -146,6 +146,11 @@ public abstract class BlockingWaiter */ public Object block(long timeout) throws AMQException, FailoverException { + if (timeout < 0) + { + throw new IllegalArgumentException("timeout must be zero or greater"); + } + long nanoTimeout = TimeUnit.MILLISECONDS.toNanos(timeout); _lock.lock(); @@ -165,26 +170,18 @@ public abstract class BlockingWaiter { try { - if (timeout == -1) - { - _receivedCondition.await(); - } - else - { - nanoTimeout = _receivedCondition.awaitNanos(nanoTimeout); + nanoTimeout = _receivedCondition.awaitNanos(nanoTimeout); - if (nanoTimeout <= 0 && !_ready && _error == null) - { - _error = new AMQTimeoutException("Server did not respond in a timely fashion", null); - _ready = true; - } + if (nanoTimeout <= 0 && !_ready && _error == null) + { + _error = new AMQTimeoutException("Server did not respond in a timely fashion", null); + _ready = true; } } catch (InterruptedException e) { _logger.error(e.getMessage(), e); - // IGNORE -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess - + // IGNORE -- //fixme this isn't ideal as being interrupted isn't equivalent to success } } } @@ -285,8 +282,8 @@ public abstract class BlockingWaiter /** * Close this Waiter so that no more errors are processed. * This is a preventative method to ensure that a second error thread does not get stuck in the error method after - * the await has returned. This has not happend but in practise but if two errors occur on the Connection at - * the same time then it is conceiveably possible for the second to get stuck if the first one is processed by a + * the await has returned. This has not happened but in practise but if two errors occur on the Connection at + * the same time then it is conceivably possible for the second to get stuck if the first one is processed by a * waiter. * * Once closed any attempt to wait will throw an exception. -- cgit v1.2.1