summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java253
1 files changed, 253 insertions, 0 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java
new file mode 100644
index 0000000000..41ba4974ec
--- /dev/null
+++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java
@@ -0,0 +1,253 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.failover;
+
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.jms.ConnectionURL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FailoverRoundRobinServers implements FailoverMethod
+{
+ private static final Logger _logger = LoggerFactory.getLogger(FailoverRoundRobinServers.class);
+
+ /** The default number of times to cycle through all servers */
+ public static final int DEFAULT_CYCLE_RETRIES = 1;
+ /** The default number of times to retry each server */
+ public static final int DEFAULT_SERVER_RETRIES = 0;
+
+ /** The index into the hostDetails array of the broker to which we are connected */
+ private int _currentBrokerIndex = 0;
+
+ /** The number of times to retry connecting for each server */
+ private int _serverRetries;
+
+ /** The current number of retry attempts made */
+ private int _currentServerRetry = 0;
+
+ /** The number of times to cycle through the servers */
+ private int _cycleRetries;
+
+ /** The current number of cycles performed. */
+ private int _currentCycleRetries = 0;
+
+ /** Array of BrokerDetail used to make connections. */
+ protected ConnectionURL _connectionDetails;
+
+ public FailoverRoundRobinServers(ConnectionURL connectionDetails)
+ {
+ if (!(connectionDetails.getBrokerCount() > 0))
+ {
+ throw new IllegalArgumentException("At least one broker details must be specified.");
+ }
+
+ _connectionDetails = connectionDetails;
+
+ // There is no current broker at startup so set it to -1.
+ _currentBrokerIndex = 0;
+
+ String cycleRetries = _connectionDetails.getFailoverOption(ConnectionURL.OPTIONS_FAILOVER_CYCLE);
+
+ _cycleRetries = DEFAULT_CYCLE_RETRIES;
+
+ if (cycleRetries != null)
+ {
+ try
+ {
+ _cycleRetries = Integer.parseInt(cycleRetries);
+ }
+ catch (NumberFormatException nfe)
+ {
+ _logger.warn("Cannot set cycle Retries, " + cycleRetries + " is not a number. Using default: " + DEFAULT_CYCLE_RETRIES);
+ }
+ }
+
+ _currentCycleRetries = 0;
+
+ _serverRetries = 0;
+ _currentServerRetry = 0;
+ }
+
+ public void reset()
+ {
+ _currentBrokerIndex = 0;
+ _currentCycleRetries = 0;
+ _currentServerRetry = 0;
+ }
+
+ public boolean failoverAllowed()
+ {
+ _logger.info("==== Checking failoverAllowed() ====");
+ _logger.info(toString());
+ _logger.info("====================================");
+ return ((_currentCycleRetries < _cycleRetries) || (_currentServerRetry < _serverRetries));
+ }
+
+ public void attainedConnection()
+ {
+ _currentCycleRetries = 0;
+ _currentServerRetry = 0;
+ }
+
+ public BrokerDetails getCurrentBrokerDetails()
+ {
+ return _connectionDetails.getBrokerDetails(_currentBrokerIndex);
+ }
+
+ public BrokerDetails getNextBrokerDetails()
+ {
+ boolean doDelay = false;
+
+ if (_currentBrokerIndex == (_connectionDetails.getBrokerCount() - 1))
+ {
+ if (_currentServerRetry < _serverRetries)
+ {
+ _logger.info("Trying " + _connectionDetails.getBrokerDetails(_currentBrokerIndex));
+ doDelay= _currentBrokerIndex != 0;
+ _currentServerRetry++;
+ }
+ else
+ {
+ _currentCycleRetries++;
+ // failed to connect to first broker
+ _currentBrokerIndex = 0;
+
+ setBroker(_connectionDetails.getBrokerDetails(_currentBrokerIndex));
+
+ // This is zero rather than -1 as we are already retrieving the details.
+ _currentServerRetry = 0;
+ }
+ // else - should force client to stop as max retries has been reached.
+ }
+ else
+ {
+ if (_currentServerRetry < _serverRetries)
+ {
+ _logger.info("Trying " + _connectionDetails.getBrokerDetails(_currentBrokerIndex));
+ doDelay= _currentBrokerIndex != 0;
+
+ _currentServerRetry++;
+ }
+ else
+ {
+ _currentBrokerIndex++;
+
+ setBroker(_connectionDetails.getBrokerDetails(_currentBrokerIndex));
+ // This is zero rather than -1 as we are already retrieving the details.
+ _currentServerRetry = 0;
+ }
+ }
+
+ BrokerDetails broker = _connectionDetails.getBrokerDetails(_currentBrokerIndex);
+
+ String delayStr = broker.getProperty(BrokerDetails.OPTIONS_CONNECT_DELAY);
+ if (delayStr != null && doDelay)
+ {
+ Long delay = Long.parseLong(delayStr);
+ _logger.info("Delay between connect retries:" + delay);
+ try
+ {
+ Thread.sleep(delay);
+ }
+ catch (InterruptedException ie)
+ {
+ return null;
+ }
+ }
+ else
+ {
+ // Only display if option not set. Not if deDelay==false.
+ if (delayStr == null)
+ {
+ _logger.info("No delay between connect retries, use tcp://host:port?connectdelay='value' to enable.");
+ }
+ }
+
+ return broker;
+ }
+
+ public void setBroker(BrokerDetails broker)
+ {
+
+ _connectionDetails.addBrokerDetails(broker);
+
+ int index = _connectionDetails.getAllBrokerDetails().indexOf(broker);
+
+ String serverRetries = broker.getProperty(BrokerDetails.OPTIONS_RETRY);
+
+ if (serverRetries != null)
+ {
+ try
+ {
+ _serverRetries = Integer.parseInt(serverRetries);
+ }
+ catch (NumberFormatException nfe)
+ {
+ _serverRetries = DEFAULT_SERVER_RETRIES;
+ }
+ }
+
+ _currentServerRetry = 0;
+ _currentBrokerIndex = index;
+ }
+
+ public void setRetries(int maxRetries)
+ {
+ _cycleRetries = maxRetries;
+ }
+
+ public String methodName()
+ {
+ return "Cycle Servers";
+ }
+
+ public String toString()
+ {
+ StringBuffer sb = new StringBuffer();
+
+ sb.append("Cycle Servers:\n");
+
+ sb.append("Cycle Retries:");
+ sb.append(_cycleRetries);
+ sb.append("\nCurrent Cycle:");
+ sb.append(_currentCycleRetries);
+ sb.append("\nServer Retries:");
+ sb.append(_serverRetries);
+ sb.append("\nCurrent Retry:");
+ sb.append(_currentServerRetry);
+ sb.append("\nCurrent Broker:");
+ sb.append(_currentBrokerIndex);
+ sb.append("\n");
+
+ for (int i = 0; i < _connectionDetails.getBrokerCount(); i++)
+ {
+ if (i == _currentBrokerIndex)
+ {
+ sb.append(">");
+ }
+
+ sb.append(_connectionDetails.getBrokerDetails(i));
+ sb.append("\n");
+ }
+
+ return sb.toString();
+ }
+}