diff options
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java | 6 | ||||
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java | 42 |
2 files changed, 40 insertions, 8 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java index 9911b75b80..a944ff6bec 100644 --- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java @@ -120,13 +120,17 @@ public class FailoverHandler implements Runnable // We wake up listeners. If they can handle failover, they will extend the // FailoverRetrySupport class and will in turn block on the latch until failover // has completed before retrying the operation. - _amqProtocolHandler.propagateExceptionToWaiters(new FailoverException("Failing over about to start")); + _amqProtocolHandler.notifyFailoverStarting(); // Since failover impacts several structures we protect them all with a single mutex. These structures // are also in child objects of the connection. This allows us to manipulate them without affecting // client code which runs in a separate thread. synchronized (_amqProtocolHandler.getConnection().getFailoverMutex()) { + //Clear the exception now that we have the failover mutex there can be no one else waiting for a frame so + // we can clear the exception. + _amqProtocolHandler.failoverInProgress(); + // We switch in a new state manager temporarily so that the interaction to get to the "connection open" // state works, without us having to terminate any existing "state waiters". We could theoretically // have a state waiter waiting until the connection is closed for some reason. Or in future we may have diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 9f2ba1d84d..3932b098cd 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -153,6 +153,10 @@ public class AMQProtocolHandler extends IoHandlerAdapter /** Used to provide a condition to wait upon for operations that are required to wait for failover to complete. */ private CountDownLatch _failoverLatch; + + /** The last failover exception that occured */ + private FailoverException _lastFailoverException; + /** Defines the default timeout to use for synchronous protocol commands. */ private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30; @@ -419,6 +423,24 @@ public class AMQProtocolHandler extends IoHandlerAdapter } } + public void notifyFailoverStarting() + { + // Set the last exception in the sync block to ensure the ordering with add. + // either this gets done and the add does the ml.error + // or the add completes first and the iterator below will do ml.error + synchronized (_frameListeners) + { + _lastFailoverException = new FailoverException("Failing over about to start"); + } + + propagateExceptionToWaiters(_lastFailoverException); + } + + public void failoverInProgress() + { + _lastFailoverException = null; + } + private static int _messageReceivedCount; public void messageReceived(IoSession session, Object message) throws Exception @@ -471,11 +493,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter new AMQMethodEvent<AMQMethodBody>(channelId, (AMQMethodBody) bodyFrame); try - { + { - boolean wasAnyoneInterested = getStateManager().methodReceived(evt); + boolean wasAnyoneInterested = getStateManager().methodReceived(evt); if (!_frameListeners.isEmpty()) { + //This iterator is safe from the error state as the frame listeners always add before they send so their + // will be ready and waiting for this response. Iterator it = _frameListeners.iterator(); while (it.hasNext()) { @@ -592,7 +616,15 @@ public class AMQProtocolHandler extends IoHandlerAdapter { try { - _frameListeners.add(listener); + synchronized (_frameListeners) + { + if (_lastFailoverException != null) + { + throw _lastFailoverException; + } + + _frameListeners.add(listener); + } _protocolSession.writeFrame(frame); AMQMethodEvent e = listener.blockForFrame(timeout); @@ -601,10 +633,6 @@ public class AMQProtocolHandler extends IoHandlerAdapter // When control resumes before this line, a reply will have been received // that matches the criteria defined in the blocking listener } - catch (AMQException e) - { - throw e; - } finally { // If we don't removeKey the listener then no-one will |