summaryrefslogtreecommitdiff
path: root/qpid/java/client
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java20
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java70
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java128
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/failover/ConnectionRedirectException.java46
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java21
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java8
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java5
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java8
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java2
10 files changed, 245 insertions, 69 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
index bde20d0550..9d9278b74d 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
@@ -20,18 +20,18 @@
*/
package org.apache.qpid.client;
-import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.url.URLHelper;
-import org.apache.qpid.url.URLSyntaxException;
-
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.url.URLHelper;
+import org.apache.qpid.url.URLSyntaxException;
+
public class AMQBrokerDetails implements BrokerDetails, Serializable
{
private static final long serialVersionUID = 8450786374975932890L;
@@ -42,6 +42,14 @@ public class AMQBrokerDetails implements BrokerDetails, Serializable
private Map<String, String> _options = new HashMap<String, String>();
+ public AMQBrokerDetails(BrokerDetails details)
+ {
+ _host = details.getHost();
+ _port = details.getPort();
+ _transport = details.getTransport();
+ _options = new HashMap<>(details.getProperties());
+ }
+
public AMQBrokerDetails(){}
public AMQBrokerDetails(String url) throws URLSyntaxException
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 8e7b5b90d8..ec60bd2914 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -62,6 +62,7 @@ import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQProtocolException;
import org.apache.qpid.AMQUnresolvedAddressException;
+import org.apache.qpid.client.failover.ConnectionRedirectException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
@@ -462,9 +463,22 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
else if (!isConnected())
{
- retryAllowed = _failoverPolicy.failoverAllowed();
- brokerDetails = _failoverPolicy.getNextBrokerDetails();
- _protocolHandler.setStateManager(new AMQStateManager(_protocolHandler.getProtocolSession()));
+ if(connectionException instanceof ConnectionRedirectException)
+ {
+ ConnectionRedirectException redirect = (ConnectionRedirectException) connectionException;
+ retryAllowed = true;
+ brokerDetails = new AMQBrokerDetails(brokerDetails);
+ brokerDetails.setHost(redirect.getHost());
+ brokerDetails.setPort(redirect.getPort());
+ _protocolHandler.setStateManager(new AMQStateManager(_protocolHandler.getProtocolSession()));
+
+ }
+ else
+ {
+ retryAllowed = _failoverPolicy.failoverAllowed();
+ brokerDetails = _failoverPolicy.getNextBrokerDetails();
+ _protocolHandler.setStateManager(new AMQStateManager(_protocolHandler.getProtocolSession()));
+ }
}
}
@@ -599,9 +613,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_virtualHost = virtualHost;
}
- public boolean attemptReconnection(String host, int port)
+ public boolean attemptReconnection(String host, int port, final boolean useFailoverConfigOnFailure)
{
- BrokerDetails bd = new AMQBrokerDetails(host, port);
+ BrokerDetails bd = new AMQBrokerDetails(_failoverPolicy.getCurrentBrokerDetails());
+ bd.setHost(host);
+ bd.setPort(port);
_failoverPolicy.setBroker(bd);
@@ -618,10 +634,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_logger.info("Unable to connect to broker at " + bd);
}
- attemptReconnection();
+ return useFailoverConfigOnFailure && attemptReconnection();
}
- return false;
}
public boolean attemptReconnection()
@@ -629,32 +644,41 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
BrokerDetails broker = null;
while (_failoverPolicy.failoverAllowed() && (broker = _failoverPolicy.getNextBrokerDetails()) != null)
{
- try
+ if (attemptConnection(broker))
{
- makeBrokerConnection(broker);
return true;
}
- catch (Exception e)
+ }
+
+ // connection unsuccessful
+ return false;
+ }
+
+ private boolean attemptConnection(final BrokerDetails broker)
+ {
+ try
+ {
+ makeBrokerConnection(broker);
+ return true;
+ }
+ catch (Exception e)
+ {
+ if (!(e instanceof AMQException))
{
- if (!(e instanceof AMQException))
+ if (_logger.isInfoEnabled())
{
- if (_logger.isInfoEnabled())
- {
- _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e);
- }
+ _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e);
}
- else
+ }
+ else
+ {
+ if (_logger.isInfoEnabled())
{
- if (_logger.isInfoEnabled())
- {
- _logger.info(e.getMessage() + ":Unable to connect to broker at "
- + _failoverPolicy.getCurrentBrokerDetails());
- }
+ _logger.info(e.getMessage() + ":Unable to connect to broker at "
+ + _failoverPolicy.getCurrentBrokerDetails());
}
}
}
-
- // connection unsuccessful
return false;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index e22a341205..2c10c585fc 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -61,6 +61,8 @@ import org.apache.qpid.transport.TransportException;
public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, ConnectionListener
{
+ private static final int DEFAULT_PORT = 5672;
+
/**
* This class logger.
*/
@@ -238,7 +240,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
{
code = AMQConstant.getConstant(ce.getClose().getReplyCode().getValue());
}
- String msg = "Cannot connect to broker: " + ce.getMessage();
+ String msg = "Cannot connect to broker ("+brokerDetail+"): " + ce.getMessage();
throw new AMQException(code, msg, ce);
}
@@ -314,25 +316,39 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
@Override
public void run()
{
- try
- {
- if (_conn.firePreFailover(false) && _conn.attemptReconnection())
+ try
{
- failoverPrep();
- _conn.resubscribeSessions();
- _conn.fireFailoverComplete();
- failoverDone.set(true);
+ boolean preFailover = _conn.firePreFailover(false);
+ if (preFailover)
+ {
+ boolean reconnected;
+ if(exc instanceof RedirectConnectionException)
+ {
+ RedirectConnectionException redirect = (RedirectConnectionException)exc;
+ reconnected = attemptRedirection(redirect.getHost(), redirect.getKnownHosts());
+ }
+ else
+ {
+ reconnected = _conn.attemptReconnection();
+ }
+ if(reconnected)
+ {
+ failoverPrep();
+ _conn.resubscribeSessions();
+ _conn.fireFailoverComplete();
+ failoverDone.set(true);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error("error during failover", e);
+ }
+ finally
+ {
+ _conn.getProtocolHandler().getFailoverLatch().countDown();
+ _conn.getProtocolHandler().setFailoverLatch(null);
}
- }
- catch (Exception e)
- {
- _logger.error("error during failover", e);
- }
- finally
- {
- _conn.getProtocolHandler().getFailoverLatch().countDown();
- _conn.getProtocolHandler().setFailoverLatch(null);
- }
}
});
@@ -376,6 +392,58 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
}
}
+ @Override
+ public boolean redirect(final String host, final List<Object> knownHosts)
+ {
+ exception = new RedirectConnectionException(host,knownHosts);
+
+ return false;
+ }
+
+ private boolean attemptRedirection(String host, List<Object> knownHosts)
+ {
+
+ boolean redirected = host != null && attemptRedirection(host);
+ if(knownHosts != null)
+ {
+ for(Object knownHost : knownHosts)
+ {
+ redirected = attemptRedirection(String.valueOf(knownHost));
+ if(redirected)
+ {
+ break;
+ }
+ }
+ }
+ return redirected;
+ }
+
+ private boolean attemptRedirection(String host)
+ {
+ int portIndex = host.indexOf(':');
+
+ int port;
+ if (portIndex == -1)
+ {
+ port = DEFAULT_PORT;
+ }
+ else
+ {
+ try
+ {
+ port = Integer.parseInt(host.substring(portIndex + 1));
+ }
+ catch(NumberFormatException e)
+ {
+ _logger.info("Unable to redirect to " + host + " - does not look like a valid address");
+ return false;
+ }
+ host = host.substring(0, portIndex);
+
+ }
+ return _conn.attemptReconnection(host,port,false);
+ }
+
public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E
{
if (_conn.isFailingOver())
@@ -538,4 +606,28 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
{
return _qpidConnection.isMessageCompressionSupported();
}
+
+ private class RedirectConnectionException extends ConnectionException
+ {
+ private final String _host;
+ private final List<Object> _knownHosts;
+
+ public RedirectConnectionException(final String host,
+ final List<Object> knownHosts)
+ {
+ super("Connection redirected to " + host + " alternates " + knownHosts);
+ _host = host;
+ _knownHosts = knownHosts;
+ }
+
+ public String getHost()
+ {
+ return _host;
+ }
+
+ public List<Object> getKnownHosts()
+ {
+ return _knownHosts;
+ }
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/ConnectionRedirectException.java b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/ConnectionRedirectException.java
new file mode 100644
index 0000000000..78efdb4317
--- /dev/null
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/ConnectionRedirectException.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.failover;
+
+import org.apache.qpid.AMQException;
+
+public class ConnectionRedirectException extends AMQException
+{
+ private final String _host;
+ private final int _port;
+
+ public ConnectionRedirectException(final String host, final int port)
+ {
+ super("Redirecting to " + host + ":" + port);
+ _host = host;
+ _port = port;
+ }
+
+ public String getHost()
+ {
+ return _host;
+ }
+
+ public int getPort()
+ {
+ return _port;
+ }
+}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
index 315e3c4a3f..c9566db68c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.client.failover;
+import java.util.concurrent.CountDownLatch;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,8 +30,6 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
-import java.util.concurrent.CountDownLatch;
-
/**
* FailoverHandler is a continuation that performs the failover procedure on a protocol session. As described in the
* class level comment for {@link AMQProtocolHandler}, a protocol connection can span many physical transport
@@ -168,7 +168,7 @@ public class FailoverHandler implements Runnable
// if _host has value then we are performing a redirect.
if (_host != null)
{
- failoverSucceeded = _amqProtocolHandler.getConnection().attemptReconnection(_host, _port);
+ failoverSucceeded = _amqProtocolHandler.getConnection().attemptReconnection(_host, _port, true);
}
else
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java
index 0ccb9b72b1..ee7e2bf567 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java
@@ -20,13 +20,18 @@
*/
package org.apache.qpid.client.handler;
+import java.nio.ByteBuffer;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
+import org.apache.qpid.client.failover.ConnectionRedirectException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.ConnectionRedirectBody;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.TransportException;
public class ConnectionRedirectMethodHandler implements StateAwareMethodListener<ConnectionRedirectBody>
{
@@ -65,7 +70,21 @@ public class ConnectionRedirectMethodHandler implements StateAwareMethodListener
}
- session.failover(host, port);
+ session.notifyError(new ConnectionRedirectException(host,port));
+
+ Sender<ByteBuffer> sender = session.getSender();
+
+ // Close the open TCP connection
+ try
+ {
+ sender.close();
+ }
+ catch(TransportException e)
+ {
+ //Ignore, they are already logged by the Sender and this
+ //is a connection-close being processed by the IoReceiver
+ //which will as it closes initiate failover if necessary.
+ }
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index c2582accdf..c1f5e4cd7f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -794,14 +794,6 @@ public class AMQProtocolHandler implements ProtocolEngine
return _writtenBytes;
}
- public void failover(String host, int port)
- {
- _failoverHandler.setHost(host);
- _failoverHandler.setPort(port);
- // see javadoc for FailoverHandler to see rationale for separate thread
- startFailoverThread();
- }
-
public void blockUntilNotFailingOver() throws InterruptedException
{
synchronized(_failoverLatchChange)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index e5765ee00f..0fd3e278d3 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -387,11 +387,6 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
return _protocolHandler.getSender();
}
- public void failover(String host, int port)
- {
- _protocolHandler.failover(host, port);
- }
-
protected AMQShortString generateQueueName()
{
int id;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index fab0bcd71f..a44b6a1ff3 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -20,6 +20,10 @@
*/
package org.apache.qpid.client.state;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,10 +33,6 @@ import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-
/**
* The state manager is responsible for managing the state of the protocol session.
* <p>
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
index 808b2781c4..9faa58f11d 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
@@ -98,7 +98,7 @@ public interface BrokerDetails
/**
* Sets the properties associated with this connection
*
- * @param props the new p[roperties.
+ * @param props the new properties.
*/
public void setProperties(Map<String,String> props);