summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java158
1 files changed, 144 insertions, 14 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java
index 6f945687cf..e05a7ab6e2 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.jms.failover;
+import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
@@ -52,7 +53,7 @@ import org.slf4j.LoggerFactory;
* from the list.
*/
-public class FailoverExchangeMethod extends FailoverRoundRobinServers implements FailoverMethod, MessageListener
+public class FailoverExchangeMethod implements FailoverMethod, MessageListener
{
private static final Logger _logger = LoggerFactory.getLogger(FailoverExchangeMethod.class);
@@ -65,17 +66,29 @@ public class FailoverExchangeMethod extends FailoverRoundRobinServers implements
/** The session used to subscribe to failover exchange */
private Session _ssn;
- private BrokerDetails _orginalBrokerDetail;
+ private BrokerDetails _originalBrokerDetail;
+
+ /** The index into the hostDetails array of the broker to which we are connected */
+ private int _currentBrokerIndex = 0;
+
+ /** The broker currently selected **/
+ private BrokerDetails _currentBrokerDetail;
+
+ /** Array of BrokerDetail used to make connections. */
+ private ConnectionURL _connectionDetails;
+
+ /** Denotes the number of failed attempts **/
+ private int _failedAttemps = 0;
public FailoverExchangeMethod(ConnectionURL connectionDetails, AMQConnection conn)
{
- super(connectionDetails);
- _orginalBrokerDetail = _connectionDetails.getBrokerDetails(0);
+ _connectionDetails = connectionDetails;
+ _originalBrokerDetail = _connectionDetails.getBrokerDetails(0);
// This is not safe to use until attainConnection is called, as this ref will not initialized fully.
// The reason being this constructor is called inside the AMWConnection constructor.
// It would be best if we find a way to pass this ref after AMQConnection is fully initialized.
- _conn = conn;
+ _conn = conn;
}
private void subscribeForUpdates() throws JMSException
@@ -96,6 +109,17 @@ public class FailoverExchangeMethod extends FailoverRoundRobinServers implements
public void onMessage(Message m)
{
_logger.info("Failover exchange notified cluster membership change");
+
+ String currentBrokerIP = "";
+ try
+ {
+ currentBrokerIP = InetAddress.getByName(_currentBrokerDetail.getHost()).getHostAddress();
+ }
+ catch(Exception e)
+ {
+ _logger.warn("Unable to resolve current broker host name",e);
+ }
+
List<BrokerDetails> brokerList = new ArrayList<BrokerDetails>();
try
{
@@ -109,15 +133,22 @@ public class FailoverExchangeMethod extends FailoverRoundRobinServers implements
for (String url:urls)
{
String[] tokens = url.split(":");
- if (tokens[0].equalsIgnoreCase(_orginalBrokerDetail.getTransport()))
+ if (tokens[0].equalsIgnoreCase(_originalBrokerDetail.getTransport()))
{
BrokerDetails broker = new AMQBrokerDetails();
broker.setTransport(tokens[0]);
broker.setHost(tokens[1]);
broker.setPort(Integer.parseInt(tokens[2]));
- broker.setProperties(_orginalBrokerDetail.getProperties());
- broker.setSSLConfiguration(_orginalBrokerDetail.getSSLConfiguration());
+ broker.setProperties(_originalBrokerDetail.getProperties());
+ broker.setSSLConfiguration(_originalBrokerDetail.getSSLConfiguration());
brokerList.add(broker);
+
+ if (currentBrokerIP.equals(broker.getHost()) &&
+ _currentBrokerDetail.getPort() == broker.getPort())
+ {
+ _currentBrokerIndex = brokerList.indexOf(broker);
+ }
+
break;
}
}
@@ -132,13 +163,20 @@ public class FailoverExchangeMethod extends FailoverRoundRobinServers implements
{
_connectionDetails.setBrokerDetails(brokerList);
}
+
+ _logger.info("============================================================");
+ _logger.info("Updated cluster membership details " + _connectionDetails);
+ _logger.info("============================================================");
}
public void attainedConnection()
{
- super.attainedConnection();
try
{
+ _failedAttemps = 0;
+ _logger.info("============================================================");
+ _logger.info("Attained connection ");
+ _logger.info("============================================================");
subscribeForUpdates();
}
catch (JMSException e)
@@ -151,17 +189,92 @@ public class FailoverExchangeMethod extends FailoverRoundRobinServers implements
{
synchronized (_brokerListLock)
{
- return super.getCurrentBrokerDetails();
+ return _connectionDetails.getBrokerDetails(_currentBrokerIndex);
}
- }
-
+ }
+
public BrokerDetails getNextBrokerDetails()
{
synchronized(_brokerListLock)
{
- return super.getNextBrokerDetails();
+ if (_currentBrokerIndex == (_connectionDetails.getBrokerCount() - 1))
+ {
+ _currentBrokerIndex = 0;
+ }
+ else
+ {
+ _currentBrokerIndex++;
+ }
+
+ BrokerDetails broker = _connectionDetails.getBrokerDetails(_currentBrokerIndex);
+
+ // When the broker list is updated it will include the current broker as well
+ // There is no point trying it again, so trying the next one.
+ if (_currentBrokerDetail != null &&
+ broker.getHost().equals(_currentBrokerDetail.getHost()) &&
+ broker.getPort() == _currentBrokerDetail.getPort())
+ {
+ return getNextBrokerDetails();
+ }
+
+ String delayStr = broker.getProperty(BrokerDetails.OPTIONS_CONNECT_DELAY);
+ if (delayStr != null)
+ {
+ Long delay = Long.parseLong(delayStr);
+ _logger.info("Delay between connect retries:" + delay);
+ try
+ {
+ Thread.sleep(delay);
+ }
+ catch (InterruptedException ie)
+ {
+ return null;
+ }
+ }
+ else
+ {
+ _logger.info("No delay between connect retries, use tcp://host:port?connectdelay='value' to enable.");
+ }
+
+ _failedAttemps ++;
+ _currentBrokerDetail = broker;
+ return broker;
}
}
+
+ public boolean failoverAllowed()
+ {
+ // We allow to Failover provided
+ // our broker list is not empty and
+ // we haven't gone through all of them
+
+ boolean b = _connectionDetails.getBrokerCount() > 0 &&
+ _failedAttemps <= _connectionDetails.getBrokerCount();
+
+
+ _logger.info("============================================================");
+ _logger.info(toString());
+ _logger.info("FailoverAllowed " + b);
+ _logger.info("============================================================");
+
+ return b;
+ }
+
+ public void reset()
+ {
+ _failedAttemps = 0;
+ }
+
+ public void setBroker(BrokerDetails broker)
+ {
+ // not sure if this method is needed
+ }
+
+ public void setRetries(int maxRetries)
+ {
+ // no max retries we keep trying as long
+ // as we get updates
+ }
public String methodName()
{
@@ -172,7 +285,24 @@ public class FailoverExchangeMethod extends FailoverRoundRobinServers implements
{
StringBuffer sb = new StringBuffer();
sb.append("FailoverExchange:\n");
- sb.append(super.toString());
+ sb.append("\n Current Broker Index:");
+ sb.append(_currentBrokerIndex);
+ sb.append("\n Failed Attempts:");
+ sb.append(_failedAttemps);
+ sb.append("\n Orignal broker details:");
+ sb.append(_originalBrokerDetail).append("\n");
+ sb.append("\n -------- Broker List -----------\n");
+ for (int i = 0; i < _connectionDetails.getBrokerCount(); i++)
+ {
+ if (i == _currentBrokerIndex)
+ {
+ sb.append(">");
+ }
+
+ sb.append(_connectionDetails.getBrokerDetails(i));
+ sb.append("\n");
+ }
+ sb.append("--------------------------------\n");
return sb.toString();
}
}