diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java | 320 |
1 files changed, 320 insertions, 0 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java new file mode 100644 index 0000000000..ef30f2adbc --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java @@ -0,0 +1,320 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.jms.failover; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.List; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; + +import org.apache.qpid.client.AMQAnyDestination; +import org.apache.qpid.client.AMQBrokerDetails; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.jms.Connection; +import org.apache.qpid.jms.ConnectionURL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * When using the Failover exchange a single broker is supplied in the URL. + * The connection will then connect to the cluster using the above broker details. + * Once connected, the membership details of the cluster will be obtained via + * subscribing to a queue bound to the failover exchange. + * + * The failover exchange will provide a list of broker URLs in the format "transport:ip:port" + * Out of this list we only select brokers that match the transport of the original + * broker supplied in the connection URL. + * + * Also properties defined for the original broker will be applied to all the brokers selected + * from the list. + */ + +public class FailoverExchangeMethod implements FailoverMethod, MessageListener +{ + private static final Logger _logger = LoggerFactory.getLogger(FailoverExchangeMethod.class); + + /** This is not safe to use until attainConnection is called */ + private Connection _conn; + + /** Protects the broker list when modifications happens */ + private Object _brokerListLock = new Object(); + + /** The session used to subscribe to failover exchange */ + private Session _ssn; + + private BrokerDetails _originalBrokerDetail; + + /** The index into the hostDetails array of the broker to which we are connected */ + private int _currentBrokerIndex = 0; + + /** The broker currently selected **/ + private BrokerDetails _currentBrokerDetail; + + /** Array of BrokerDetail used to make connections. */ + private ConnectionURL _connectionDetails; + + /** Denotes the number of failed attempts **/ + private int _failedAttemps = 0; + + public FailoverExchangeMethod(ConnectionURL connectionDetails, Connection conn) + { + _connectionDetails = connectionDetails; + _originalBrokerDetail = _connectionDetails.getBrokerDetails(0); + + // This is not safe to use until attainConnection is called, as this ref will not initialized fully. + // The reason being this constructor is called inside the AMWConnection constructor. + // It would be best if we find a way to pass this ref after AMQConnection is fully initialized. + _conn = conn; + } + + private void subscribeForUpdates() throws JMSException + { + if (_ssn == null) + { + _ssn = _conn.createSession(false,Session.AUTO_ACKNOWLEDGE); + MessageConsumer cons = _ssn.createConsumer( + new AMQAnyDestination(new AMQShortString("amq.failover"), + new AMQShortString("amq.failover"), + new AMQShortString(""), + true,true,null,false, + new AMQShortString[0])); + cons.setMessageListener(this); + } + } + + public void onMessage(Message m) + { + _logger.info("Failover exchange notified cluster membership change"); + + String currentBrokerIP = ""; + try + { + currentBrokerIP = InetAddress.getByName(_currentBrokerDetail.getHost()).getHostAddress(); + } + catch(Exception e) + { + _logger.warn("Unable to resolve current broker host name",e); + } + + List<BrokerDetails> brokerList = new ArrayList<BrokerDetails>(); + try + { + List<String> list = (List<String>)m.getObjectProperty("amq.failover"); + for (String brokerEntry:list) + { + String[] urls = brokerEntry.substring(5) .split(","); + // Iterate until you find the correct transport + // Need to reconsider the logic when the C++ broker supports + // SSL URLs. + for (String url:urls) + { + String[] tokens = url.split(":"); + if (tokens[0].equalsIgnoreCase(_originalBrokerDetail.getTransport())) + { + BrokerDetails broker = new AMQBrokerDetails(); + broker.setTransport(tokens[0]); + broker.setHost(tokens[1]); + broker.setPort(Integer.parseInt(tokens[2])); + broker.setProperties(_originalBrokerDetail.getProperties()); + broker.setSSLConfiguration(_originalBrokerDetail.getSSLConfiguration()); + brokerList.add(broker); + + if (currentBrokerIP.equals(broker.getHost()) && + _currentBrokerDetail.getPort() == broker.getPort()) + { + _currentBrokerIndex = brokerList.indexOf(broker); + } + + break; + } + } + } + } + catch(JMSException e) + { + _logger.error("Error parsing the message sent by failover exchange",e); + } + + synchronized (_brokerListLock) + { + _connectionDetails.setBrokerDetails(brokerList); + } + + _logger.info("============================================================"); + _logger.info("Updated cluster membership details " + _connectionDetails); + _logger.info("============================================================"); + } + + public void attainedConnection() + { + try + { + _failedAttemps = 0; + _logger.info("============================================================"); + _logger.info("Attained connection "); + _logger.info("============================================================"); + subscribeForUpdates(); + } + catch (JMSException e) + { + throw new RuntimeException("Unable to subscribe for cluster membership updates",e); + } + } + + public BrokerDetails getCurrentBrokerDetails() + { + synchronized (_brokerListLock) + { + _currentBrokerDetail = _connectionDetails.getBrokerDetails(_currentBrokerIndex); + return _currentBrokerDetail; + } + } + + public BrokerDetails getNextBrokerDetails() + { + BrokerDetails broker = null; + + synchronized(_brokerListLock) + { + if (_currentBrokerIndex == (_connectionDetails.getBrokerCount() - 1)) + { + _currentBrokerIndex = 0; + } + else + { + _currentBrokerIndex++; + } + + broker = _connectionDetails.getBrokerDetails(_currentBrokerIndex); + + // When the broker list is updated it will include the current broker as well + // There is no point trying it again, so trying the next one. + if (_currentBrokerDetail != null && + broker.getHost().equals(_currentBrokerDetail.getHost()) && + broker.getPort() == _currentBrokerDetail.getPort()) + { + if (_connectionDetails.getBrokerCount() > 1) + { + return getNextBrokerDetails(); + } + else + { + _failedAttemps ++; + return null; + } + } + } + + String delayStr = broker.getProperty(BrokerDetails.OPTIONS_CONNECT_DELAY); + if (delayStr != null) + { + Long delay = Long.parseLong(delayStr); + _logger.info("Delay between connect retries:" + delay); + try + { + Thread.sleep(delay); + } + catch (InterruptedException ie) + { + return null; + } + } + else + { + _logger.info("No delay between connect retries, use tcp://host:port?connectdelay='value' to enable."); + } + + _failedAttemps ++; + _currentBrokerDetail = broker; + + return broker; + } + + public boolean failoverAllowed() + { + // We allow to Failover provided + // our broker list is not empty and + // we haven't gone through all of them + + boolean b = _connectionDetails.getBrokerCount() > 0 && + _failedAttemps <= _connectionDetails.getBrokerCount(); + + + _logger.info("============================================================"); + _logger.info(toString()); + _logger.info("FailoverAllowed " + b); + _logger.info("============================================================"); + + return b; + } + + public void reset() + { + _failedAttemps = 0; + } + + public void setBroker(BrokerDetails broker) + { + // not sure if this method is needed + } + + public void setRetries(int maxRetries) + { + // no max retries we keep trying as long + // as we get updates + } + + public String methodName() + { + return "Failover Exchange"; + } + + public String toString() + { + StringBuffer sb = new StringBuffer(); + sb.append("FailoverExchange:\n"); + sb.append("\n Current Broker Index:"); + sb.append(_currentBrokerIndex); + sb.append("\n Failed Attempts:"); + sb.append(_failedAttemps); + sb.append("\n Orignal broker details:"); + sb.append(_originalBrokerDetail).append("\n"); + sb.append("\n -------- Broker List -----------\n"); + for (int i = 0; i < _connectionDetails.getBrokerCount(); i++) + { + if (i == _currentBrokerIndex) + { + sb.append(">"); + } + + sb.append(_connectionDetails.getBrokerDetails(i)); + sb.append("\n"); + } + sb.append("--------------------------------\n"); + return sb.toString(); + } +} |