summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2008-03-21 11:12:29 +0000
committerMartin Ritchie <ritchiem@apache.org>2008-03-21 11:12:29 +0000
commit1077fc8f79b764b553dab6a4db01337721695f31 (patch)
tree380d166a72cb2318d1a86656dae12b9f6436fdff
parente611a8b65f0314de59c6000cb39e85ae15a8b768 (diff)
downloadqpid-python-1077fc8f79b764b553dab6a4db01337721695f31.tar.gz
QPID-866 : Based on Patch from ASkinner. Only the FailoverException makes sence to process this way so remove list and synchronized so we either do an add or throw the set FailoverException.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@639598 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java42
2 files changed, 40 insertions, 8 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
index 9911b75b80..a944ff6bec 100644
--- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
@@ -120,13 +120,17 @@ public class FailoverHandler implements Runnable
// We wake up listeners. If they can handle failover, they will extend the
// FailoverRetrySupport class and will in turn block on the latch until failover
// has completed before retrying the operation.
- _amqProtocolHandler.propagateExceptionToWaiters(new FailoverException("Failing over about to start"));
+ _amqProtocolHandler.notifyFailoverStarting();
// Since failover impacts several structures we protect them all with a single mutex. These structures
// are also in child objects of the connection. This allows us to manipulate them without affecting
// client code which runs in a separate thread.
synchronized (_amqProtocolHandler.getConnection().getFailoverMutex())
{
+ //Clear the exception now that we have the failover mutex there can be no one else waiting for a frame so
+ // we can clear the exception.
+ _amqProtocolHandler.failoverInProgress();
+
// We switch in a new state manager temporarily so that the interaction to get to the "connection open"
// state works, without us having to terminate any existing "state waiters". We could theoretically
// have a state waiter waiting until the connection is closed for some reason. Or in future we may have
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 9f2ba1d84d..3932b098cd 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -153,6 +153,10 @@ public class AMQProtocolHandler extends IoHandlerAdapter
/** Used to provide a condition to wait upon for operations that are required to wait for failover to complete. */
private CountDownLatch _failoverLatch;
+
+ /** The last failover exception that occured */
+ private FailoverException _lastFailoverException;
+
/** Defines the default timeout to use for synchronous protocol commands. */
private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30;
@@ -419,6 +423,24 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
}
+ public void notifyFailoverStarting()
+ {
+ // Set the last exception in the sync block to ensure the ordering with add.
+ // either this gets done and the add does the ml.error
+ // or the add completes first and the iterator below will do ml.error
+ synchronized (_frameListeners)
+ {
+ _lastFailoverException = new FailoverException("Failing over about to start");
+ }
+
+ propagateExceptionToWaiters(_lastFailoverException);
+ }
+
+ public void failoverInProgress()
+ {
+ _lastFailoverException = null;
+ }
+
private static int _messageReceivedCount;
public void messageReceived(IoSession session, Object message) throws Exception
@@ -471,11 +493,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter
new AMQMethodEvent<AMQMethodBody>(channelId, (AMQMethodBody) bodyFrame);
try
- {
+ {
- boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
+ boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
if (!_frameListeners.isEmpty())
{
+ //This iterator is safe from the error state as the frame listeners always add before they send so their
+ // will be ready and waiting for this response.
Iterator it = _frameListeners.iterator();
while (it.hasNext())
{
@@ -592,7 +616,15 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
try
{
- _frameListeners.add(listener);
+ synchronized (_frameListeners)
+ {
+ if (_lastFailoverException != null)
+ {
+ throw _lastFailoverException;
+ }
+
+ _frameListeners.add(listener);
+ }
_protocolSession.writeFrame(frame);
AMQMethodEvent e = listener.blockForFrame(timeout);
@@ -601,10 +633,6 @@ public class AMQProtocolHandler extends IoHandlerAdapter
// When control resumes before this line, a reply will have been received
// that matches the criteria defined in the blocking listener
}
- catch (AMQException e)
- {
- throw e;
- }
finally
{
// If we don't removeKey the listener then no-one will