summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Donald Kennedy <grkvlt@apache.org>2011-03-03 01:56:07 +0000
committerAndrew Donald Kennedy <grkvlt@apache.org>2011-03-03 01:56:07 +0000
commitb56d67f7c344d6c93d7a12ecf11ccffac3101bce (patch)
tree675afc14ffaef8ea988c8a3d4d0d6e3897871df3
parent3db640c26ba5de4ab4161fb93813c66e5e99af77 (diff)
downloadqpid-python-b56d67f7c344d6c93d7a12ecf11ccffac3101bce.tar.gz
QPID-3008: fix 0-10 failover mechanism properly
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/grkvlt-network-20110301@1076492 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java99
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java3
2 files changed, 81 insertions, 21 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index e5a3d1d9f8..d738ecad7b 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -31,15 +31,19 @@ import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.XASession;
+import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.failover.FailoverHandler;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
+import org.apache.qpid.client.failover.FailoverState;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ChannelLimitReachedException;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.ConnectionClose;
import org.apache.qpid.transport.ConnectionException;
@@ -201,6 +205,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
public void failoverPrep()
{
+ _logger.info("Preparing failover");
List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values());
for (AMQSession s : sessions)
{
@@ -210,6 +215,8 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
public void resubscribeSessions() throws JMSException, AMQException, FailoverException
{
+ _logger.info("Resuming connection");
+ getQpidConnection().resume();
List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values());
_logger.info(String.format("Resubscribing sessions = %s sessions.size=%d", sessions, sessions.size()));
for (AMQSession s : sessions)
@@ -255,32 +262,84 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
ConnectionClose close = exc.getClose();
if (close == null)
{
- _conn.getProtocolHandler().setFailoverLatch(new CountDownLatch(1));
-
- try
+ startFailoverThread();
+ }
+ else
+ {
+ _conn.exceptionReceived(exc);
+ }
+ }
+
+
+ /** See {@link FailoverHandler} to see rationale for separate thread. */
+ private void startFailoverThread()
+ {
+ final Thread failoverThread;
+ try
+ {
+ failoverThread = Threading.getThreadFactory().createThread(new Runnable()
{
- if (_conn.firePreFailover(false) && _conn.attemptReconnection())
+ public void run()
{
- _conn.failoverPrep();
- _conn.resubscribeSessions();
- _conn.fireFailoverComplete();
- return;
+ _conn.getProtocolHandler().setFailoverLatch(new CountDownLatch(1));
+
+ synchronized (_conn.getFailoverMutex())
+ {
+ try
+ {
+ if (!_conn.firePreFailover(false))
+ {
+ _logger.info("Failover process veto-ed by client");
+ _conn.exceptionReceived(new AMQDisconnectedException("Failover was vetoed by client", null));
+ return;
+ }
+
+ if (_conn.attemptReconnection())
+ {
+ _conn.failoverPrep();
+ if (_conn.firePreResubscribe())
+ {
+ _logger.info("Resubscribing on new connection");
+ _conn.resubscribeSessions();
+ }
+ else
+ {
+ _logger.info("Client vetoed automatic resubscription");
+ }
+
+ _conn.fireFailoverComplete();
+ _logger.info("Connection failover completed successfully");
+ }
+ else
+ {
+ _conn.exceptionReceived(new AMQDisconnectedException("Server closed connection and no failover was successful", null));
+ }
+ return;
+ }
+ catch (Exception e)
+ {
+ _logger.info("Failover process failed - exception being propagated by protocol handler", e);
+ _conn.exceptionReceived(e);
+ }
+ finally
+ {
+ _conn.getProtocolHandler().getFailoverLatch().countDown();
+ _conn.getProtocolHandler().setFailoverLatch(null);
+ }
+ }
}
- }
- catch (Exception e)
- {
- _logger.error("error during failover", e);
- }
- finally
- {
- _conn.getProtocolHandler().getFailoverLatch().countDown();
- _conn.getProtocolHandler().setFailoverLatch(null);
- }
+ });
}
- else
+ catch (Exception e)
{
- _conn.exceptionReceived(exc);
+ throw new RuntimeException("Failed to create thread", e);
}
+
+ failoverThread.setName("Failover");
+ // Do not inherit daemon-ness from current thread as this can be a daemon
+ // thread such as a AnonymousIoService thread.
+ failoverThread.setDaemon(false);
+ failoverThread.start();
}
public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 1f940b62f0..3b05c0ebd0 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -2930,6 +2930,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private void resubscribeConsumers() throws AMQException
{
ArrayList<C> consumers = new ArrayList<C>(_consumers.values());
+ _logger.info(String.format("Resubscribing consumers = %s consumers.size=%d", consumers, consumers.size()));
_consumers.clear();
for (C consumer : consumers)
@@ -2943,7 +2944,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private void resubscribeProducers() throws AMQException
{
ArrayList producers = new ArrayList(_producers.values());
- _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey
+ _logger.info(String.format("Resubscribing producers = %s producers.size=%d", producers, producers.size()));
for (Iterator it = producers.iterator(); it.hasNext();)
{
P producer = (P) it.next();