diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java | 158 |
1 files changed, 144 insertions, 14 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java index 6f945687cf..e05a7ab6e2 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.jms.failover; +import java.net.InetAddress; import java.util.ArrayList; import java.util.List; @@ -52,7 +53,7 @@ import org.slf4j.LoggerFactory; * from the list. */ -public class FailoverExchangeMethod extends FailoverRoundRobinServers implements FailoverMethod, MessageListener +public class FailoverExchangeMethod implements FailoverMethod, MessageListener { private static final Logger _logger = LoggerFactory.getLogger(FailoverExchangeMethod.class); @@ -65,17 +66,29 @@ public class FailoverExchangeMethod extends FailoverRoundRobinServers implements /** The session used to subscribe to failover exchange */ private Session _ssn; - private BrokerDetails _orginalBrokerDetail; + private BrokerDetails _originalBrokerDetail; + + /** The index into the hostDetails array of the broker to which we are connected */ + private int _currentBrokerIndex = 0; + + /** The broker currently selected **/ + private BrokerDetails _currentBrokerDetail; + + /** Array of BrokerDetail used to make connections. */ + private ConnectionURL _connectionDetails; + + /** Denotes the number of failed attempts **/ + private int _failedAttemps = 0; public FailoverExchangeMethod(ConnectionURL connectionDetails, AMQConnection conn) { - super(connectionDetails); - _orginalBrokerDetail = _connectionDetails.getBrokerDetails(0); + _connectionDetails = connectionDetails; + _originalBrokerDetail = _connectionDetails.getBrokerDetails(0); // This is not safe to use until attainConnection is called, as this ref will not initialized fully. // The reason being this constructor is called inside the AMWConnection constructor. // It would be best if we find a way to pass this ref after AMQConnection is fully initialized. - _conn = conn; + _conn = conn; } private void subscribeForUpdates() throws JMSException @@ -96,6 +109,17 @@ public class FailoverExchangeMethod extends FailoverRoundRobinServers implements public void onMessage(Message m) { _logger.info("Failover exchange notified cluster membership change"); + + String currentBrokerIP = ""; + try + { + currentBrokerIP = InetAddress.getByName(_currentBrokerDetail.getHost()).getHostAddress(); + } + catch(Exception e) + { + _logger.warn("Unable to resolve current broker host name",e); + } + List<BrokerDetails> brokerList = new ArrayList<BrokerDetails>(); try { @@ -109,15 +133,22 @@ public class FailoverExchangeMethod extends FailoverRoundRobinServers implements for (String url:urls) { String[] tokens = url.split(":"); - if (tokens[0].equalsIgnoreCase(_orginalBrokerDetail.getTransport())) + if (tokens[0].equalsIgnoreCase(_originalBrokerDetail.getTransport())) { BrokerDetails broker = new AMQBrokerDetails(); broker.setTransport(tokens[0]); broker.setHost(tokens[1]); broker.setPort(Integer.parseInt(tokens[2])); - broker.setProperties(_orginalBrokerDetail.getProperties()); - broker.setSSLConfiguration(_orginalBrokerDetail.getSSLConfiguration()); + broker.setProperties(_originalBrokerDetail.getProperties()); + broker.setSSLConfiguration(_originalBrokerDetail.getSSLConfiguration()); brokerList.add(broker); + + if (currentBrokerIP.equals(broker.getHost()) && + _currentBrokerDetail.getPort() == broker.getPort()) + { + _currentBrokerIndex = brokerList.indexOf(broker); + } + break; } } @@ -132,13 +163,20 @@ public class FailoverExchangeMethod extends FailoverRoundRobinServers implements { _connectionDetails.setBrokerDetails(brokerList); } + + _logger.info("============================================================"); + _logger.info("Updated cluster membership details " + _connectionDetails); + _logger.info("============================================================"); } public void attainedConnection() { - super.attainedConnection(); try { + _failedAttemps = 0; + _logger.info("============================================================"); + _logger.info("Attained connection "); + _logger.info("============================================================"); subscribeForUpdates(); } catch (JMSException e) @@ -151,17 +189,92 @@ public class FailoverExchangeMethod extends FailoverRoundRobinServers implements { synchronized (_brokerListLock) { - return super.getCurrentBrokerDetails(); + return _connectionDetails.getBrokerDetails(_currentBrokerIndex); } - } - + } + public BrokerDetails getNextBrokerDetails() { synchronized(_brokerListLock) { - return super.getNextBrokerDetails(); + if (_currentBrokerIndex == (_connectionDetails.getBrokerCount() - 1)) + { + _currentBrokerIndex = 0; + } + else + { + _currentBrokerIndex++; + } + + BrokerDetails broker = _connectionDetails.getBrokerDetails(_currentBrokerIndex); + + // When the broker list is updated it will include the current broker as well + // There is no point trying it again, so trying the next one. + if (_currentBrokerDetail != null && + broker.getHost().equals(_currentBrokerDetail.getHost()) && + broker.getPort() == _currentBrokerDetail.getPort()) + { + return getNextBrokerDetails(); + } + + String delayStr = broker.getProperty(BrokerDetails.OPTIONS_CONNECT_DELAY); + if (delayStr != null) + { + Long delay = Long.parseLong(delayStr); + _logger.info("Delay between connect retries:" + delay); + try + { + Thread.sleep(delay); + } + catch (InterruptedException ie) + { + return null; + } + } + else + { + _logger.info("No delay between connect retries, use tcp://host:port?connectdelay='value' to enable."); + } + + _failedAttemps ++; + _currentBrokerDetail = broker; + return broker; } } + + public boolean failoverAllowed() + { + // We allow to Failover provided + // our broker list is not empty and + // we haven't gone through all of them + + boolean b = _connectionDetails.getBrokerCount() > 0 && + _failedAttemps <= _connectionDetails.getBrokerCount(); + + + _logger.info("============================================================"); + _logger.info(toString()); + _logger.info("FailoverAllowed " + b); + _logger.info("============================================================"); + + return b; + } + + public void reset() + { + _failedAttemps = 0; + } + + public void setBroker(BrokerDetails broker) + { + // not sure if this method is needed + } + + public void setRetries(int maxRetries) + { + // no max retries we keep trying as long + // as we get updates + } public String methodName() { @@ -172,7 +285,24 @@ public class FailoverExchangeMethod extends FailoverRoundRobinServers implements { StringBuffer sb = new StringBuffer(); sb.append("FailoverExchange:\n"); - sb.append(super.toString()); + sb.append("\n Current Broker Index:"); + sb.append(_currentBrokerIndex); + sb.append("\n Failed Attempts:"); + sb.append(_failedAttemps); + sb.append("\n Orignal broker details:"); + sb.append(_originalBrokerDetail).append("\n"); + sb.append("\n -------- Broker List -----------\n"); + for (int i = 0; i < _connectionDetails.getBrokerCount(); i++) + { + if (i == _currentBrokerIndex) + { + sb.append(">"); + } + + sb.append(_connectionDetails.getBrokerDetails(i)); + sb.append("\n"); + } + sb.append("--------------------------------\n"); return sb.toString(); } } |