diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java | 128 |
1 files changed, 110 insertions, 18 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index e22a341205..2c10c585fc 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -61,6 +61,8 @@ import org.apache.qpid.transport.TransportException; public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, ConnectionListener { + private static final int DEFAULT_PORT = 5672; + /** * This class logger. */ @@ -238,7 +240,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { code = AMQConstant.getConstant(ce.getClose().getReplyCode().getValue()); } - String msg = "Cannot connect to broker: " + ce.getMessage(); + String msg = "Cannot connect to broker ("+brokerDetail+"): " + ce.getMessage(); throw new AMQException(code, msg, ce); } @@ -314,25 +316,39 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec @Override public void run() { - try - { - if (_conn.firePreFailover(false) && _conn.attemptReconnection()) + try { - failoverPrep(); - _conn.resubscribeSessions(); - _conn.fireFailoverComplete(); - failoverDone.set(true); + boolean preFailover = _conn.firePreFailover(false); + if (preFailover) + { + boolean reconnected; + if(exc instanceof RedirectConnectionException) + { + RedirectConnectionException redirect = (RedirectConnectionException)exc; + reconnected = attemptRedirection(redirect.getHost(), redirect.getKnownHosts()); + } + else + { + reconnected = _conn.attemptReconnection(); + } + if(reconnected) + { + failoverPrep(); + _conn.resubscribeSessions(); + _conn.fireFailoverComplete(); + failoverDone.set(true); + } + } + } + catch (Exception e) + { + _logger.error("error during failover", e); + } + finally + { + _conn.getProtocolHandler().getFailoverLatch().countDown(); + _conn.getProtocolHandler().setFailoverLatch(null); } - } - catch (Exception e) - { - _logger.error("error during failover", e); - } - finally - { - _conn.getProtocolHandler().getFailoverLatch().countDown(); - _conn.getProtocolHandler().setFailoverLatch(null); - } } }); @@ -376,6 +392,58 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec } } + @Override + public boolean redirect(final String host, final List<Object> knownHosts) + { + exception = new RedirectConnectionException(host,knownHosts); + + return false; + } + + private boolean attemptRedirection(String host, List<Object> knownHosts) + { + + boolean redirected = host != null && attemptRedirection(host); + if(knownHosts != null) + { + for(Object knownHost : knownHosts) + { + redirected = attemptRedirection(String.valueOf(knownHost)); + if(redirected) + { + break; + } + } + } + return redirected; + } + + private boolean attemptRedirection(String host) + { + int portIndex = host.indexOf(':'); + + int port; + if (portIndex == -1) + { + port = DEFAULT_PORT; + } + else + { + try + { + port = Integer.parseInt(host.substring(portIndex + 1)); + } + catch(NumberFormatException e) + { + _logger.info("Unable to redirect to " + host + " - does not look like a valid address"); + return false; + } + host = host.substring(0, portIndex); + + } + return _conn.attemptReconnection(host,port,false); + } + public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E { if (_conn.isFailingOver()) @@ -538,4 +606,28 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { return _qpidConnection.isMessageCompressionSupported(); } + + private class RedirectConnectionException extends ConnectionException + { + private final String _host; + private final List<Object> _knownHosts; + + public RedirectConnectionException(final String host, + final List<Object> knownHosts) + { + super("Connection redirected to " + host + " alternates " + knownHosts); + _host = host; + _knownHosts = knownHosts; + } + + public String getHost() + { + return _host; + } + + public List<Object> getKnownHosts() + { + return _knownHosts; + } + } } |