diff options
author | Andrew Donald Kennedy <grkvlt@apache.org> | 2011-03-03 01:56:07 +0000 |
---|---|---|
committer | Andrew Donald Kennedy <grkvlt@apache.org> | 2011-03-03 01:56:07 +0000 |
commit | b56d67f7c344d6c93d7a12ecf11ccffac3101bce (patch) | |
tree | 675afc14ffaef8ea988c8a3d4d0d6e3897871df3 | |
parent | 3db640c26ba5de4ab4161fb93813c66e5e99af77 (diff) | |
download | qpid-python-b56d67f7c344d6c93d7a12ecf11ccffac3101bce.tar.gz |
QPID-3008: fix 0-10 failover mechanism properly
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/grkvlt-network-20110301@1076492 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java | 99 | ||||
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 3 |
2 files changed, 81 insertions, 21 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index e5a3d1d9f8..d738ecad7b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -31,15 +31,19 @@ import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.XASession; +import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.failover.FailoverHandler; import org.apache.qpid.client.failover.FailoverProtectedOperation; +import org.apache.qpid.client.failover.FailoverState; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ChannelLimitReachedException; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.ConnectionClose; import org.apache.qpid.transport.ConnectionException; @@ -201,6 +205,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec public void failoverPrep() { + _logger.info("Preparing failover"); List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values()); for (AMQSession s : sessions) { @@ -210,6 +215,8 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec public void resubscribeSessions() throws JMSException, AMQException, FailoverException { + _logger.info("Resuming connection"); + getQpidConnection().resume(); List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values()); _logger.info(String.format("Resubscribing sessions = %s sessions.size=%d", sessions, sessions.size())); for (AMQSession s : sessions) @@ -255,32 +262,84 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec ConnectionClose close = exc.getClose(); if (close == null) { - _conn.getProtocolHandler().setFailoverLatch(new CountDownLatch(1)); - - try + startFailoverThread(); + } + else + { + _conn.exceptionReceived(exc); + } + } + + + /** See {@link FailoverHandler} to see rationale for separate thread. */ + private void startFailoverThread() + { + final Thread failoverThread; + try + { + failoverThread = Threading.getThreadFactory().createThread(new Runnable() { - if (_conn.firePreFailover(false) && _conn.attemptReconnection()) + public void run() { - _conn.failoverPrep(); - _conn.resubscribeSessions(); - _conn.fireFailoverComplete(); - return; + _conn.getProtocolHandler().setFailoverLatch(new CountDownLatch(1)); + + synchronized (_conn.getFailoverMutex()) + { + try + { + if (!_conn.firePreFailover(false)) + { + _logger.info("Failover process veto-ed by client"); + _conn.exceptionReceived(new AMQDisconnectedException("Failover was vetoed by client", null)); + return; + } + + if (_conn.attemptReconnection()) + { + _conn.failoverPrep(); + if (_conn.firePreResubscribe()) + { + _logger.info("Resubscribing on new connection"); + _conn.resubscribeSessions(); + } + else + { + _logger.info("Client vetoed automatic resubscription"); + } + + _conn.fireFailoverComplete(); + _logger.info("Connection failover completed successfully"); + } + else + { + _conn.exceptionReceived(new AMQDisconnectedException("Server closed connection and no failover was successful", null)); + } + return; + } + catch (Exception e) + { + _logger.info("Failover process failed - exception being propagated by protocol handler", e); + _conn.exceptionReceived(e); + } + finally + { + _conn.getProtocolHandler().getFailoverLatch().countDown(); + _conn.getProtocolHandler().setFailoverLatch(null); + } + } } - } - catch (Exception e) - { - _logger.error("error during failover", e); - } - finally - { - _conn.getProtocolHandler().getFailoverLatch().countDown(); - _conn.getProtocolHandler().setFailoverLatch(null); - } + }); } - else + catch (Exception e) { - _conn.exceptionReceived(exc); + throw new RuntimeException("Failed to create thread", e); } + + failoverThread.setName("Failover"); + // Do not inherit daemon-ness from current thread as this can be a daemon + // thread such as a AnonymousIoService thread. + failoverThread.setDaemon(false); + failoverThread.start(); } public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 1f940b62f0..3b05c0ebd0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -2930,6 +2930,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private void resubscribeConsumers() throws AMQException { ArrayList<C> consumers = new ArrayList<C>(_consumers.values()); + _logger.info(String.format("Resubscribing consumers = %s consumers.size=%d", consumers, consumers.size())); _consumers.clear(); for (C consumer : consumers) @@ -2943,7 +2944,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private void resubscribeProducers() throws AMQException { ArrayList producers = new ArrayList(_producers.values()); - _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey + _logger.info(String.format("Resubscribing producers = %s producers.size=%d", producers, producers.size())); for (Iterator it = producers.iterator(); it.hasNext();) { P producer = (P) it.next(); |