summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java42
2 files changed, 40 insertions, 8 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
index 9911b75b80..a944ff6bec 100644
--- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
@@ -120,13 +120,17 @@ public class FailoverHandler implements Runnable
// We wake up listeners. If they can handle failover, they will extend the
// FailoverRetrySupport class and will in turn block on the latch until failover
// has completed before retrying the operation.
- _amqProtocolHandler.propagateExceptionToWaiters(new FailoverException("Failing over about to start"));
+ _amqProtocolHandler.notifyFailoverStarting();
// Since failover impacts several structures we protect them all with a single mutex. These structures
// are also in child objects of the connection. This allows us to manipulate them without affecting
// client code which runs in a separate thread.
synchronized (_amqProtocolHandler.getConnection().getFailoverMutex())
{
+ //Clear the exception now that we have the failover mutex there can be no one else waiting for a frame so
+ // we can clear the exception.
+ _amqProtocolHandler.failoverInProgress();
+
// We switch in a new state manager temporarily so that the interaction to get to the "connection open"
// state works, without us having to terminate any existing "state waiters". We could theoretically
// have a state waiter waiting until the connection is closed for some reason. Or in future we may have
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 9f2ba1d84d..3932b098cd 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -153,6 +153,10 @@ public class AMQProtocolHandler extends IoHandlerAdapter
/** Used to provide a condition to wait upon for operations that are required to wait for failover to complete. */
private CountDownLatch _failoverLatch;
+
+ /** The last failover exception that occured */
+ private FailoverException _lastFailoverException;
+
/** Defines the default timeout to use for synchronous protocol commands. */
private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30;
@@ -419,6 +423,24 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
}
+ public void notifyFailoverStarting()
+ {
+ // Set the last exception in the sync block to ensure the ordering with add.
+ // either this gets done and the add does the ml.error
+ // or the add completes first and the iterator below will do ml.error
+ synchronized (_frameListeners)
+ {
+ _lastFailoverException = new FailoverException("Failing over about to start");
+ }
+
+ propagateExceptionToWaiters(_lastFailoverException);
+ }
+
+ public void failoverInProgress()
+ {
+ _lastFailoverException = null;
+ }
+
private static int _messageReceivedCount;
public void messageReceived(IoSession session, Object message) throws Exception
@@ -471,11 +493,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter
new AMQMethodEvent<AMQMethodBody>(channelId, (AMQMethodBody) bodyFrame);
try
- {
+ {
- boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
+ boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
if (!_frameListeners.isEmpty())
{
+ //This iterator is safe from the error state as the frame listeners always add before they send so their
+ // will be ready and waiting for this response.
Iterator it = _frameListeners.iterator();
while (it.hasNext())
{
@@ -592,7 +616,15 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
try
{
- _frameListeners.add(listener);
+ synchronized (_frameListeners)
+ {
+ if (_lastFailoverException != null)
+ {
+ throw _lastFailoverException;
+ }
+
+ _frameListeners.add(listener);
+ }
_protocolSession.writeFrame(frame);
AMQMethodEvent e = listener.blockForFrame(timeout);
@@ -601,10 +633,6 @@ public class AMQProtocolHandler extends IoHandlerAdapter
// When control resumes before this line, a reply will have been received
// that matches the criteria defined in the blocking listener
}
- catch (AMQException e)
- {
- throw e;
- }
finally
{
// If we don't removeKey the listener then no-one will