diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java | 42 |
1 files changed, 35 insertions, 7 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 9f2ba1d84d..3932b098cd 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -153,6 +153,10 @@ public class AMQProtocolHandler extends IoHandlerAdapter /** Used to provide a condition to wait upon for operations that are required to wait for failover to complete. */ private CountDownLatch _failoverLatch; + + /** The last failover exception that occured */ + private FailoverException _lastFailoverException; + /** Defines the default timeout to use for synchronous protocol commands. */ private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30; @@ -419,6 +423,24 @@ public class AMQProtocolHandler extends IoHandlerAdapter } } + public void notifyFailoverStarting() + { + // Set the last exception in the sync block to ensure the ordering with add. + // either this gets done and the add does the ml.error + // or the add completes first and the iterator below will do ml.error + synchronized (_frameListeners) + { + _lastFailoverException = new FailoverException("Failing over about to start"); + } + + propagateExceptionToWaiters(_lastFailoverException); + } + + public void failoverInProgress() + { + _lastFailoverException = null; + } + private static int _messageReceivedCount; public void messageReceived(IoSession session, Object message) throws Exception @@ -471,11 +493,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter new AMQMethodEvent<AMQMethodBody>(channelId, (AMQMethodBody) bodyFrame); try - { + { - boolean wasAnyoneInterested = getStateManager().methodReceived(evt); + boolean wasAnyoneInterested = getStateManager().methodReceived(evt); if (!_frameListeners.isEmpty()) { + //This iterator is safe from the error state as the frame listeners always add before they send so their + // will be ready and waiting for this response. Iterator it = _frameListeners.iterator(); while (it.hasNext()) { @@ -592,7 +616,15 @@ public class AMQProtocolHandler extends IoHandlerAdapter { try { - _frameListeners.add(listener); + synchronized (_frameListeners) + { + if (_lastFailoverException != null) + { + throw _lastFailoverException; + } + + _frameListeners.add(listener); + } _protocolSession.writeFrame(frame); AMQMethodEvent e = listener.blockForFrame(timeout); @@ -601,10 +633,6 @@ public class AMQProtocolHandler extends IoHandlerAdapter // When control resumes before this line, a reply will have been received // that matches the criteria defined in the blocking listener } - catch (AMQException e) - { - throw e; - } finally { // If we don't removeKey the listener then no-one will |