diff options
Diffstat (limited to 'trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java')
-rw-r--r-- | trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java | 317 |
1 files changed, 0 insertions, 317 deletions
diff --git a/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java b/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java deleted file mode 100644 index 960661daea..0000000000 --- a/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java +++ /dev/null @@ -1,317 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.jms.failover; - -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.List; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.Session; - -import org.apache.qpid.client.AMQAnyDestination; -import org.apache.qpid.client.AMQBrokerDetails; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.jms.ConnectionURL; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * When using the Failover exchange a single broker is supplied in the URL. - * The connection will then connect to the cluster using the above broker details. - * Once connected, the membership details of the cluster will be obtained via - * subscribing to a queue bound to the failover exchange. - * - * The failover exchange will provide a list of broker URLs in the format "transport:ip:port" - * Out of this list we only select brokers that match the transport of the original - * broker supplied in the connection URL. - * - * Also properties defined for the original broker will be applied to all the brokers selected - * from the list. - */ - -public class FailoverExchangeMethod implements FailoverMethod, MessageListener -{ - private static final Logger _logger = LoggerFactory.getLogger(FailoverExchangeMethod.class); - - /** This is not safe to use until attainConnection is called */ - private AMQConnection _conn; - - /** Protects the broker list when modifications happens */ - private Object _brokerListLock = new Object(); - - /** The session used to subscribe to failover exchange */ - private Session _ssn; - - private BrokerDetails _originalBrokerDetail; - - /** The index into the hostDetails array of the broker to which we are connected */ - private int _currentBrokerIndex = 0; - - /** The broker currently selected **/ - private BrokerDetails _currentBrokerDetail; - - /** Array of BrokerDetail used to make connections. */ - private ConnectionURL _connectionDetails; - - /** Denotes the number of failed attempts **/ - private int _failedAttemps = 0; - - public FailoverExchangeMethod(ConnectionURL connectionDetails, AMQConnection conn) - { - _connectionDetails = connectionDetails; - _originalBrokerDetail = _connectionDetails.getBrokerDetails(0); - - // This is not safe to use until attainConnection is called, as this ref will not initialized fully. - // The reason being this constructor is called inside the AMWConnection constructor. - // It would be best if we find a way to pass this ref after AMQConnection is fully initialized. - _conn = conn; - } - - private void subscribeForUpdates() throws JMSException - { - if (_ssn == null) - { - _ssn = _conn.createSession(false,Session.AUTO_ACKNOWLEDGE); - MessageConsumer cons = _ssn.createConsumer( - new AMQAnyDestination(new AMQShortString("amq.failover"), - new AMQShortString("amq.failover"), - new AMQShortString(""), - true,true,null,false, - new AMQShortString[0])); - cons.setMessageListener(this); - } - } - - public void onMessage(Message m) - { - _logger.info("Failover exchange notified cluster membership change"); - - String currentBrokerIP = ""; - try - { - currentBrokerIP = InetAddress.getByName(_currentBrokerDetail.getHost()).getHostAddress(); - } - catch(Exception e) - { - _logger.warn("Unable to resolve current broker host name",e); - } - - List<BrokerDetails> brokerList = new ArrayList<BrokerDetails>(); - try - { - List<String> list = (List<String>)m.getObjectProperty("amq.failover"); - for (String brokerEntry:list) - { - String[] urls = brokerEntry.substring(5) .split(","); - // Iterate until you find the correct transport - // Need to reconsider the logic when the C++ broker supports - // SSL URLs. - for (String url:urls) - { - String[] tokens = url.split(":"); - if (tokens[0].equalsIgnoreCase(_originalBrokerDetail.getTransport())) - { - BrokerDetails broker = new AMQBrokerDetails(); - broker.setTransport(tokens[0]); - broker.setHost(tokens[1]); - broker.setPort(Integer.parseInt(tokens[2])); - broker.setProperties(_originalBrokerDetail.getProperties()); - broker.setSSLConfiguration(_originalBrokerDetail.getSSLConfiguration()); - brokerList.add(broker); - - if (currentBrokerIP.equals(broker.getHost()) && - _currentBrokerDetail.getPort() == broker.getPort()) - { - _currentBrokerIndex = brokerList.indexOf(broker); - } - - break; - } - } - } - } - catch(JMSException e) - { - _logger.error("Error parsing the message sent by failover exchange",e); - } - - synchronized (_brokerListLock) - { - _connectionDetails.setBrokerDetails(brokerList); - } - - _logger.info("============================================================"); - _logger.info("Updated cluster membership details " + _connectionDetails); - _logger.info("============================================================"); - } - - public void attainedConnection() - { - try - { - _failedAttemps = 0; - _logger.info("============================================================"); - _logger.info("Attained connection "); - _logger.info("============================================================"); - subscribeForUpdates(); - } - catch (JMSException e) - { - throw new RuntimeException("Unable to subscribe for cluster membership updates",e); - } - } - - public BrokerDetails getCurrentBrokerDetails() - { - synchronized (_brokerListLock) - { - _currentBrokerDetail = _connectionDetails.getBrokerDetails(_currentBrokerIndex); - return _currentBrokerDetail; - } - } - - public BrokerDetails getNextBrokerDetails() - { - synchronized(_brokerListLock) - { - if (_currentBrokerIndex == (_connectionDetails.getBrokerCount() - 1)) - { - _currentBrokerIndex = 0; - } - else - { - _currentBrokerIndex++; - } - - BrokerDetails broker = _connectionDetails.getBrokerDetails(_currentBrokerIndex); - - // When the broker list is updated it will include the current broker as well - // There is no point trying it again, so trying the next one. - if (_currentBrokerDetail != null && - broker.getHost().equals(_currentBrokerDetail.getHost()) && - broker.getPort() == _currentBrokerDetail.getPort()) - { - if (_connectionDetails.getBrokerCount() > 1) - { - return getNextBrokerDetails(); - } - else - { - _failedAttemps ++; - return null; - } - } - - String delayStr = broker.getProperty(BrokerDetails.OPTIONS_CONNECT_DELAY); - if (delayStr != null) - { - Long delay = Long.parseLong(delayStr); - _logger.info("Delay between connect retries:" + delay); - try - { - Thread.sleep(delay); - } - catch (InterruptedException ie) - { - return null; - } - } - else - { - _logger.info("No delay between connect retries, use tcp://host:port?connectdelay='value' to enable."); - } - - _failedAttemps ++; - _currentBrokerDetail = broker; - return broker; - } - } - - public boolean failoverAllowed() - { - // We allow to Failover provided - // our broker list is not empty and - // we haven't gone through all of them - - boolean b = _connectionDetails.getBrokerCount() > 0 && - _failedAttemps <= _connectionDetails.getBrokerCount(); - - - _logger.info("============================================================"); - _logger.info(toString()); - _logger.info("FailoverAllowed " + b); - _logger.info("============================================================"); - - return b; - } - - public void reset() - { - _failedAttemps = 0; - } - - public void setBroker(BrokerDetails broker) - { - // not sure if this method is needed - } - - public void setRetries(int maxRetries) - { - // no max retries we keep trying as long - // as we get updates - } - - public String methodName() - { - return "Failover Exchange"; - } - - public String toString() - { - StringBuffer sb = new StringBuffer(); - sb.append("FailoverExchange:\n"); - sb.append("\n Current Broker Index:"); - sb.append(_currentBrokerIndex); - sb.append("\n Failed Attempts:"); - sb.append(_failedAttemps); - sb.append("\n Orignal broker details:"); - sb.append(_originalBrokerDetail).append("\n"); - sb.append("\n -------- Broker List -----------\n"); - for (int i = 0; i < _connectionDetails.getBrokerCount(); i++) - { - if (i == _currentBrokerIndex) - { - sb.append(">"); - } - - sb.append(_connectionDetails.getBrokerDetails(i)); - sb.append("\n"); - } - sb.append("--------------------------------\n"); - return sb.toString(); - } -} |