summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java128
1 files changed, 110 insertions, 18 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index e22a341205..2c10c585fc 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -61,6 +61,8 @@ import org.apache.qpid.transport.TransportException;
public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, ConnectionListener
{
+ private static final int DEFAULT_PORT = 5672;
+
/**
* This class logger.
*/
@@ -238,7 +240,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
{
code = AMQConstant.getConstant(ce.getClose().getReplyCode().getValue());
}
- String msg = "Cannot connect to broker: " + ce.getMessage();
+ String msg = "Cannot connect to broker ("+brokerDetail+"): " + ce.getMessage();
throw new AMQException(code, msg, ce);
}
@@ -314,25 +316,39 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
@Override
public void run()
{
- try
- {
- if (_conn.firePreFailover(false) && _conn.attemptReconnection())
+ try
{
- failoverPrep();
- _conn.resubscribeSessions();
- _conn.fireFailoverComplete();
- failoverDone.set(true);
+ boolean preFailover = _conn.firePreFailover(false);
+ if (preFailover)
+ {
+ boolean reconnected;
+ if(exc instanceof RedirectConnectionException)
+ {
+ RedirectConnectionException redirect = (RedirectConnectionException)exc;
+ reconnected = attemptRedirection(redirect.getHost(), redirect.getKnownHosts());
+ }
+ else
+ {
+ reconnected = _conn.attemptReconnection();
+ }
+ if(reconnected)
+ {
+ failoverPrep();
+ _conn.resubscribeSessions();
+ _conn.fireFailoverComplete();
+ failoverDone.set(true);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error("error during failover", e);
+ }
+ finally
+ {
+ _conn.getProtocolHandler().getFailoverLatch().countDown();
+ _conn.getProtocolHandler().setFailoverLatch(null);
}
- }
- catch (Exception e)
- {
- _logger.error("error during failover", e);
- }
- finally
- {
- _conn.getProtocolHandler().getFailoverLatch().countDown();
- _conn.getProtocolHandler().setFailoverLatch(null);
- }
}
});
@@ -376,6 +392,58 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
}
}
+ @Override
+ public boolean redirect(final String host, final List<Object> knownHosts)
+ {
+ exception = new RedirectConnectionException(host,knownHosts);
+
+ return false;
+ }
+
+ private boolean attemptRedirection(String host, List<Object> knownHosts)
+ {
+
+ boolean redirected = host != null && attemptRedirection(host);
+ if(knownHosts != null)
+ {
+ for(Object knownHost : knownHosts)
+ {
+ redirected = attemptRedirection(String.valueOf(knownHost));
+ if(redirected)
+ {
+ break;
+ }
+ }
+ }
+ return redirected;
+ }
+
+ private boolean attemptRedirection(String host)
+ {
+ int portIndex = host.indexOf(':');
+
+ int port;
+ if (portIndex == -1)
+ {
+ port = DEFAULT_PORT;
+ }
+ else
+ {
+ try
+ {
+ port = Integer.parseInt(host.substring(portIndex + 1));
+ }
+ catch(NumberFormatException e)
+ {
+ _logger.info("Unable to redirect to " + host + " - does not look like a valid address");
+ return false;
+ }
+ host = host.substring(0, portIndex);
+
+ }
+ return _conn.attemptReconnection(host,port,false);
+ }
+
public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E
{
if (_conn.isFailingOver())
@@ -538,4 +606,28 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
{
return _qpidConnection.isMessageCompressionSupported();
}
+
+ private class RedirectConnectionException extends ConnectionException
+ {
+ private final String _host;
+ private final List<Object> _knownHosts;
+
+ public RedirectConnectionException(final String host,
+ final List<Object> knownHosts)
+ {
+ super("Connection redirected to " + host + " alternates " + knownHosts);
+ _host = host;
+ _knownHosts = knownHosts;
+ }
+
+ public String getHost()
+ {
+ return _host;
+ }
+
+ public List<Object> getKnownHosts()
+ {
+ return _knownHosts;
+ }
+ }
}