diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client')
9 files changed, 244 insertions, 68 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java index bde20d0550..9d9278b74d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java @@ -20,18 +20,18 @@ */ package org.apache.qpid.client; -import org.apache.qpid.configuration.ClientProperties; -import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.transport.ConnectionSettings; -import org.apache.qpid.url.URLHelper; -import org.apache.qpid.url.URLSyntaxException; - import java.io.Serializable; import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.transport.ConnectionSettings; +import org.apache.qpid.url.URLHelper; +import org.apache.qpid.url.URLSyntaxException; + public class AMQBrokerDetails implements BrokerDetails, Serializable { private static final long serialVersionUID = 8450786374975932890L; @@ -42,6 +42,14 @@ public class AMQBrokerDetails implements BrokerDetails, Serializable private Map<String, String> _options = new HashMap<String, String>(); + public AMQBrokerDetails(BrokerDetails details) + { + _host = details.getHost(); + _port = details.getPort(); + _transport = details.getTransport(); + _options = new HashMap<>(details.getProperties()); + } + public AMQBrokerDetails(){} public AMQBrokerDetails(String url) throws URLSyntaxException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 8e7b5b90d8..ec60bd2914 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -62,6 +62,7 @@ import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQProtocolException; import org.apache.qpid.AMQUnresolvedAddressException; +import org.apache.qpid.client.failover.ConnectionRedirectException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.protocol.AMQProtocolHandler; @@ -462,9 +463,22 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } else if (!isConnected()) { - retryAllowed = _failoverPolicy.failoverAllowed(); - brokerDetails = _failoverPolicy.getNextBrokerDetails(); - _protocolHandler.setStateManager(new AMQStateManager(_protocolHandler.getProtocolSession())); + if(connectionException instanceof ConnectionRedirectException) + { + ConnectionRedirectException redirect = (ConnectionRedirectException) connectionException; + retryAllowed = true; + brokerDetails = new AMQBrokerDetails(brokerDetails); + brokerDetails.setHost(redirect.getHost()); + brokerDetails.setPort(redirect.getPort()); + _protocolHandler.setStateManager(new AMQStateManager(_protocolHandler.getProtocolSession())); + + } + else + { + retryAllowed = _failoverPolicy.failoverAllowed(); + brokerDetails = _failoverPolicy.getNextBrokerDetails(); + _protocolHandler.setStateManager(new AMQStateManager(_protocolHandler.getProtocolSession())); + } } } @@ -599,9 +613,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _virtualHost = virtualHost; } - public boolean attemptReconnection(String host, int port) + public boolean attemptReconnection(String host, int port, final boolean useFailoverConfigOnFailure) { - BrokerDetails bd = new AMQBrokerDetails(host, port); + BrokerDetails bd = new AMQBrokerDetails(_failoverPolicy.getCurrentBrokerDetails()); + bd.setHost(host); + bd.setPort(port); _failoverPolicy.setBroker(bd); @@ -618,10 +634,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _logger.info("Unable to connect to broker at " + bd); } - attemptReconnection(); + return useFailoverConfigOnFailure && attemptReconnection(); } - return false; } public boolean attemptReconnection() @@ -629,32 +644,41 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect BrokerDetails broker = null; while (_failoverPolicy.failoverAllowed() && (broker = _failoverPolicy.getNextBrokerDetails()) != null) { - try + if (attemptConnection(broker)) { - makeBrokerConnection(broker); return true; } - catch (Exception e) + } + + // connection unsuccessful + return false; + } + + private boolean attemptConnection(final BrokerDetails broker) + { + try + { + makeBrokerConnection(broker); + return true; + } + catch (Exception e) + { + if (!(e instanceof AMQException)) { - if (!(e instanceof AMQException)) + if (_logger.isInfoEnabled()) { - if (_logger.isInfoEnabled()) - { - _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e); - } + _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e); } - else + } + else + { + if (_logger.isInfoEnabled()) { - if (_logger.isInfoEnabled()) - { - _logger.info(e.getMessage() + ":Unable to connect to broker at " - + _failoverPolicy.getCurrentBrokerDetails()); - } + _logger.info(e.getMessage() + ":Unable to connect to broker at " + + _failoverPolicy.getCurrentBrokerDetails()); } } } - - // connection unsuccessful return false; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index e22a341205..2c10c585fc 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -61,6 +61,8 @@ import org.apache.qpid.transport.TransportException; public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, ConnectionListener { + private static final int DEFAULT_PORT = 5672; + /** * This class logger. */ @@ -238,7 +240,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { code = AMQConstant.getConstant(ce.getClose().getReplyCode().getValue()); } - String msg = "Cannot connect to broker: " + ce.getMessage(); + String msg = "Cannot connect to broker ("+brokerDetail+"): " + ce.getMessage(); throw new AMQException(code, msg, ce); } @@ -314,25 +316,39 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec @Override public void run() { - try - { - if (_conn.firePreFailover(false) && _conn.attemptReconnection()) + try { - failoverPrep(); - _conn.resubscribeSessions(); - _conn.fireFailoverComplete(); - failoverDone.set(true); + boolean preFailover = _conn.firePreFailover(false); + if (preFailover) + { + boolean reconnected; + if(exc instanceof RedirectConnectionException) + { + RedirectConnectionException redirect = (RedirectConnectionException)exc; + reconnected = attemptRedirection(redirect.getHost(), redirect.getKnownHosts()); + } + else + { + reconnected = _conn.attemptReconnection(); + } + if(reconnected) + { + failoverPrep(); + _conn.resubscribeSessions(); + _conn.fireFailoverComplete(); + failoverDone.set(true); + } + } + } + catch (Exception e) + { + _logger.error("error during failover", e); + } + finally + { + _conn.getProtocolHandler().getFailoverLatch().countDown(); + _conn.getProtocolHandler().setFailoverLatch(null); } - } - catch (Exception e) - { - _logger.error("error during failover", e); - } - finally - { - _conn.getProtocolHandler().getFailoverLatch().countDown(); - _conn.getProtocolHandler().setFailoverLatch(null); - } } }); @@ -376,6 +392,58 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec } } + @Override + public boolean redirect(final String host, final List<Object> knownHosts) + { + exception = new RedirectConnectionException(host,knownHosts); + + return false; + } + + private boolean attemptRedirection(String host, List<Object> knownHosts) + { + + boolean redirected = host != null && attemptRedirection(host); + if(knownHosts != null) + { + for(Object knownHost : knownHosts) + { + redirected = attemptRedirection(String.valueOf(knownHost)); + if(redirected) + { + break; + } + } + } + return redirected; + } + + private boolean attemptRedirection(String host) + { + int portIndex = host.indexOf(':'); + + int port; + if (portIndex == -1) + { + port = DEFAULT_PORT; + } + else + { + try + { + port = Integer.parseInt(host.substring(portIndex + 1)); + } + catch(NumberFormatException e) + { + _logger.info("Unable to redirect to " + host + " - does not look like a valid address"); + return false; + } + host = host.substring(0, portIndex); + + } + return _conn.attemptReconnection(host,port,false); + } + public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E { if (_conn.isFailingOver()) @@ -538,4 +606,28 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { return _qpidConnection.isMessageCompressionSupported(); } + + private class RedirectConnectionException extends ConnectionException + { + private final String _host; + private final List<Object> _knownHosts; + + public RedirectConnectionException(final String host, + final List<Object> knownHosts) + { + super("Connection redirected to " + host + " alternates " + knownHosts); + _host = host; + _knownHosts = knownHosts; + } + + public String getHost() + { + return _host; + } + + public List<Object> getKnownHosts() + { + return _knownHosts; + } + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/ConnectionRedirectException.java b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/ConnectionRedirectException.java new file mode 100644 index 0000000000..78efdb4317 --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/ConnectionRedirectException.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.failover; + +import org.apache.qpid.AMQException; + +public class ConnectionRedirectException extends AMQException +{ + private final String _host; + private final int _port; + + public ConnectionRedirectException(final String host, final int port) + { + super("Redirecting to " + host + ":" + port); + _host = host; + _port = port; + } + + public String getHost() + { + return _host; + } + + public int getPort() + { + return _port; + } +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java index 315e3c4a3f..c9566db68c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.client.failover; +import java.util.concurrent.CountDownLatch; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,8 +30,6 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; -import java.util.concurrent.CountDownLatch; - /** * FailoverHandler is a continuation that performs the failover procedure on a protocol session. As described in the * class level comment for {@link AMQProtocolHandler}, a protocol connection can span many physical transport @@ -168,7 +168,7 @@ public class FailoverHandler implements Runnable // if _host has value then we are performing a redirect. if (_host != null) { - failoverSucceeded = _amqProtocolHandler.getConnection().attemptReconnection(_host, _port); + failoverSucceeded = _amqProtocolHandler.getConnection().attemptReconnection(_host, _port, true); } else { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java index 0ccb9b72b1..ee7e2bf567 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java @@ -20,13 +20,18 @@ */ package org.apache.qpid.client.handler; +import java.nio.ByteBuffer; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; +import org.apache.qpid.client.failover.ConnectionRedirectException; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.ConnectionRedirectBody; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.TransportException; public class ConnectionRedirectMethodHandler implements StateAwareMethodListener<ConnectionRedirectBody> { @@ -65,7 +70,21 @@ public class ConnectionRedirectMethodHandler implements StateAwareMethodListener } - session.failover(host, port); + session.notifyError(new ConnectionRedirectException(host,port)); + + Sender<ByteBuffer> sender = session.getSender(); + + // Close the open TCP connection + try + { + sender.close(); + } + catch(TransportException e) + { + //Ignore, they are already logged by the Sender and this + //is a connection-close being processed by the IoReceiver + //which will as it closes initiate failover if necessary. + } } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index c2582accdf..c1f5e4cd7f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -794,14 +794,6 @@ public class AMQProtocolHandler implements ProtocolEngine return _writtenBytes; } - public void failover(String host, int port) - { - _failoverHandler.setHost(host); - _failoverHandler.setPort(port); - // see javadoc for FailoverHandler to see rationale for separate thread - startFailoverThread(); - } - public void blockUntilNotFailingOver() throws InterruptedException { synchronized(_failoverLatchChange) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index e5765ee00f..0fd3e278d3 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -387,11 +387,6 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession return _protocolHandler.getSender(); } - public void failover(String host, int port) - { - _protocolHandler.failover(host, port); - } - protected AMQShortString generateQueueName() { int id; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index fab0bcd71f..a44b6a1ff3 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -20,6 +20,10 @@ */ package org.apache.qpid.client.state; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,10 +33,6 @@ import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArrayList; - /** * The state manager is responsible for managing the state of the protocol session. * <p> |