diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2009-02-09 05:01:28 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2009-02-09 05:01:28 +0000 |
commit | 217b24c7e05ac60981d3e2e7c45f3ac27674d36f (patch) | |
tree | f477763062e890df147a89ec571a777a13362fa6 /qpid | |
parent | b0f5c074bb21fcdd9fd7c9680a05fb0de39f7268 (diff) | |
download | qpid-python-217b24c7e05ac60981d3e2e7c45f3ac27674d36f.tar.gz |
This is related to QPID-1649
There is some code in AMQConnectionDelegate_0_10.java related to QPID-1645
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@742260 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
5 files changed, 194 insertions, 4 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 269937d0bd..6c9fcc0f4c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -369,7 +369,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _syncPersistence = Boolean.getBoolean(ClientProperties.SYNC_PERSISTENT_PROP_NAME); } - _failoverPolicy = new FailoverPolicy(connectionURL); + _failoverPolicy = new FailoverPolicy(connectionURL, this); BrokerDetails brokerDetails = _failoverPolicy.getNextBrokerDetails(); if (brokerDetails.getTransport().equals(BrokerDetails.VM)) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 77860ed60c..29f1aec2f5 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -154,16 +154,21 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec this.setIdleTimeout(Long.getLong(ClientProperties.IDLE_TIMEOUT_PROP_NAME,0)); } + String saslMechs = brokerDetail.getProperty("sasl_mechs")!= null? + brokerDetail.getProperty("sasl_mechs"): + System.getProperty("qpid.sasl_mechs","PLAIN"); + _qpidConnection.connect(brokerDetail.getHost(), brokerDetail.getPort(), _conn.getVirtualHost(), - _conn.getUsername(), _conn.getPassword(), brokerDetail.useSSL()); + _conn.getUsername(), _conn.getPassword(), brokerDetail.useSSL(),saslMechs); _conn._connected = true; + _conn._failoverPolicy.attainedConnection(); } catch(ProtocolVersionException pe) { return new ProtocolVersion(pe.getMajor(), pe.getMinor()); } catch (ConnectionException e) - { + { throw new AMQException(AMQConstant.CHANNEL_ERROR, "cannot connect to broker", e); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java index 4186706ade..bfd1161c90 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.jms; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.jms.failover.FailoverExchangeMethod; import org.apache.qpid.jms.failover.FailoverMethod; import org.apache.qpid.jms.failover.FailoverRoundRobinServers; import org.apache.qpid.jms.failover.FailoverSingleServer; @@ -48,7 +50,7 @@ public class FailoverPolicy private long _lastMethodTime; private long _lastFailTime; - public FailoverPolicy(ConnectionURL connectionDetails) + public FailoverPolicy(ConnectionURL connectionDetails, AMQConnection conn) { FailoverMethod method; @@ -88,6 +90,10 @@ public class FailoverPolicy { method = new FailoverRoundRobinServers(connectionDetails); } + else if (failoverMethod.equals(FailoverMethod.FAILOVER_EXCHANGE)) + { + method = new FailoverExchangeMethod(connectionDetails, conn); + } else { try diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java new file mode 100644 index 0000000000..6f945687cf --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java @@ -0,0 +1,178 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.jms.failover; + +import java.util.ArrayList; +import java.util.List; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; + +import org.apache.qpid.client.AMQAnyDestination; +import org.apache.qpid.client.AMQBrokerDetails; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.jms.ConnectionURL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * When using the Failover exchange a single broker is supplied in the URL. + * The connection will then connect to the cluster using the above broker details. + * Once connected, the membership details of the cluster will be obtained via + * subscribing to a queue bound to the failover exchange. + * + * The failover exchange will provide a list of broker URLs in the format "transport:ip:port" + * Out of this list we only select brokers that match the transport of the original + * broker supplied in the connection URL. + * + * Also properties defined for the original broker will be applied to all the brokers selected + * from the list. + */ + +public class FailoverExchangeMethod extends FailoverRoundRobinServers implements FailoverMethod, MessageListener +{ + private static final Logger _logger = LoggerFactory.getLogger(FailoverExchangeMethod.class); + + /** This is not safe to use until attainConnection is called */ + private AMQConnection _conn; + + /** Protects the broker list when modifications happens */ + private Object _brokerListLock = new Object(); + + /** The session used to subscribe to failover exchange */ + private Session _ssn; + + private BrokerDetails _orginalBrokerDetail; + + public FailoverExchangeMethod(ConnectionURL connectionDetails, AMQConnection conn) + { + super(connectionDetails); + _orginalBrokerDetail = _connectionDetails.getBrokerDetails(0); + + // This is not safe to use until attainConnection is called, as this ref will not initialized fully. + // The reason being this constructor is called inside the AMWConnection constructor. + // It would be best if we find a way to pass this ref after AMQConnection is fully initialized. + _conn = conn; + } + + private void subscribeForUpdates() throws JMSException + { + if (_ssn == null) + { + _ssn = _conn.createSession(false,Session.AUTO_ACKNOWLEDGE); + MessageConsumer cons = _ssn.createConsumer( + new AMQAnyDestination(new AMQShortString("amq.failover"), + new AMQShortString("amq.failover"), + new AMQShortString(""), + true,true,null,false, + new AMQShortString[0])); + cons.setMessageListener(this); + } + } + + public void onMessage(Message m) + { + _logger.info("Failover exchange notified cluster membership change"); + List<BrokerDetails> brokerList = new ArrayList<BrokerDetails>(); + try + { + List<String> list = (List<String>)m.getObjectProperty("amq.failover"); + for (String brokerEntry:list) + { + String[] urls = brokerEntry.substring(5) .split(","); + // Iterate until you find the correct transport + // Need to reconsider the logic when the C++ broker supports + // SSL URLs. + for (String url:urls) + { + String[] tokens = url.split(":"); + if (tokens[0].equalsIgnoreCase(_orginalBrokerDetail.getTransport())) + { + BrokerDetails broker = new AMQBrokerDetails(); + broker.setTransport(tokens[0]); + broker.setHost(tokens[1]); + broker.setPort(Integer.parseInt(tokens[2])); + broker.setProperties(_orginalBrokerDetail.getProperties()); + broker.setSSLConfiguration(_orginalBrokerDetail.getSSLConfiguration()); + brokerList.add(broker); + break; + } + } + } + } + catch(JMSException e) + { + _logger.error("Error parsing the message sent by failover exchange",e); + } + + synchronized (_brokerListLock) + { + _connectionDetails.setBrokerDetails(brokerList); + } + } + + public void attainedConnection() + { + super.attainedConnection(); + try + { + subscribeForUpdates(); + } + catch (JMSException e) + { + throw new RuntimeException("Unable to subscribe for cluster membership updates",e); + } + } + + public BrokerDetails getCurrentBrokerDetails() + { + synchronized (_brokerListLock) + { + return super.getCurrentBrokerDetails(); + } + } + + public BrokerDetails getNextBrokerDetails() + { + synchronized(_brokerListLock) + { + return super.getNextBrokerDetails(); + } + } + + public String methodName() + { + return "Failover Exchange"; + } + + public String toString() + { + StringBuffer sb = new StringBuffer(); + sb.append("FailoverExchange:\n"); + sb.append(super.toString()); + return sb.toString(); + } +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverMethod.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverMethod.java index d7ec46dea3..a921649829 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverMethod.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverMethod.java @@ -27,6 +27,7 @@ public interface FailoverMethod { public static final String SINGLE_BROKER = "singlebroker"; public static final String ROUND_ROBIN = "roundrobin"; + public static final String FAILOVER_EXCHANGE= "failover_exchange"; public static final String RANDOM = "random"; /** * Reset the Failover to initial conditions |