summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2009-02-09 05:01:28 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2009-02-09 05:01:28 +0000
commit217b24c7e05ac60981d3e2e7c45f3ac27674d36f (patch)
treef477763062e890df147a89ec571a777a13362fa6
parentb0f5c074bb21fcdd9fd7c9680a05fb0de39f7268 (diff)
downloadqpid-python-217b24c7e05ac60981d3e2e7c45f3ac27674d36f.tar.gz
This is related to QPID-1649
There is some code in AMQConnectionDelegate_0_10.java related to QPID-1645 git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@742260 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java9
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java8
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java178
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverMethod.java1
5 files changed, 194 insertions, 4 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 269937d0bd..6c9fcc0f4c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -369,7 +369,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_syncPersistence = Boolean.getBoolean(ClientProperties.SYNC_PERSISTENT_PROP_NAME);
}
- _failoverPolicy = new FailoverPolicy(connectionURL);
+ _failoverPolicy = new FailoverPolicy(connectionURL, this);
BrokerDetails brokerDetails = _failoverPolicy.getNextBrokerDetails();
if (brokerDetails.getTransport().equals(BrokerDetails.VM))
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 77860ed60c..29f1aec2f5 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -154,16 +154,21 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
this.setIdleTimeout(Long.getLong(ClientProperties.IDLE_TIMEOUT_PROP_NAME,0));
}
+ String saslMechs = brokerDetail.getProperty("sasl_mechs")!= null?
+ brokerDetail.getProperty("sasl_mechs"):
+ System.getProperty("qpid.sasl_mechs","PLAIN");
+
_qpidConnection.connect(brokerDetail.getHost(), brokerDetail.getPort(), _conn.getVirtualHost(),
- _conn.getUsername(), _conn.getPassword(), brokerDetail.useSSL());
+ _conn.getUsername(), _conn.getPassword(), brokerDetail.useSSL(),saslMechs);
_conn._connected = true;
+ _conn._failoverPolicy.attainedConnection();
}
catch(ProtocolVersionException pe)
{
return new ProtocolVersion(pe.getMajor(), pe.getMinor());
}
catch (ConnectionException e)
- {
+ {
throw new AMQException(AMQConstant.CHANNEL_ERROR, "cannot connect to broker", e);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java
index 4186706ade..bfd1161c90 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.jms;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.jms.failover.FailoverExchangeMethod;
import org.apache.qpid.jms.failover.FailoverMethod;
import org.apache.qpid.jms.failover.FailoverRoundRobinServers;
import org.apache.qpid.jms.failover.FailoverSingleServer;
@@ -48,7 +50,7 @@ public class FailoverPolicy
private long _lastMethodTime;
private long _lastFailTime;
- public FailoverPolicy(ConnectionURL connectionDetails)
+ public FailoverPolicy(ConnectionURL connectionDetails, AMQConnection conn)
{
FailoverMethod method;
@@ -88,6 +90,10 @@ public class FailoverPolicy
{
method = new FailoverRoundRobinServers(connectionDetails);
}
+ else if (failoverMethod.equals(FailoverMethod.FAILOVER_EXCHANGE))
+ {
+ method = new FailoverExchangeMethod(connectionDetails, conn);
+ }
else
{
try
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java
new file mode 100644
index 0000000000..6f945687cf
--- /dev/null
+++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java
@@ -0,0 +1,178 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.failover;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import org.apache.qpid.client.AMQAnyDestination;
+import org.apache.qpid.client.AMQBrokerDetails;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.jms.ConnectionURL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * When using the Failover exchange a single broker is supplied in the URL.
+ * The connection will then connect to the cluster using the above broker details.
+ * Once connected, the membership details of the cluster will be obtained via
+ * subscribing to a queue bound to the failover exchange.
+ *
+ * The failover exchange will provide a list of broker URLs in the format "transport:ip:port"
+ * Out of this list we only select brokers that match the transport of the original
+ * broker supplied in the connection URL.
+ *
+ * Also properties defined for the original broker will be applied to all the brokers selected
+ * from the list.
+ */
+
+public class FailoverExchangeMethod extends FailoverRoundRobinServers implements FailoverMethod, MessageListener
+{
+ private static final Logger _logger = LoggerFactory.getLogger(FailoverExchangeMethod.class);
+
+ /** This is not safe to use until attainConnection is called */
+ private AMQConnection _conn;
+
+ /** Protects the broker list when modifications happens */
+ private Object _brokerListLock = new Object();
+
+ /** The session used to subscribe to failover exchange */
+ private Session _ssn;
+
+ private BrokerDetails _orginalBrokerDetail;
+
+ public FailoverExchangeMethod(ConnectionURL connectionDetails, AMQConnection conn)
+ {
+ super(connectionDetails);
+ _orginalBrokerDetail = _connectionDetails.getBrokerDetails(0);
+
+ // This is not safe to use until attainConnection is called, as this ref will not initialized fully.
+ // The reason being this constructor is called inside the AMWConnection constructor.
+ // It would be best if we find a way to pass this ref after AMQConnection is fully initialized.
+ _conn = conn;
+ }
+
+ private void subscribeForUpdates() throws JMSException
+ {
+ if (_ssn == null)
+ {
+ _ssn = _conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer cons = _ssn.createConsumer(
+ new AMQAnyDestination(new AMQShortString("amq.failover"),
+ new AMQShortString("amq.failover"),
+ new AMQShortString(""),
+ true,true,null,false,
+ new AMQShortString[0]));
+ cons.setMessageListener(this);
+ }
+ }
+
+ public void onMessage(Message m)
+ {
+ _logger.info("Failover exchange notified cluster membership change");
+ List<BrokerDetails> brokerList = new ArrayList<BrokerDetails>();
+ try
+ {
+ List<String> list = (List<String>)m.getObjectProperty("amq.failover");
+ for (String brokerEntry:list)
+ {
+ String[] urls = brokerEntry.substring(5) .split(",");
+ // Iterate until you find the correct transport
+ // Need to reconsider the logic when the C++ broker supports
+ // SSL URLs.
+ for (String url:urls)
+ {
+ String[] tokens = url.split(":");
+ if (tokens[0].equalsIgnoreCase(_orginalBrokerDetail.getTransport()))
+ {
+ BrokerDetails broker = new AMQBrokerDetails();
+ broker.setTransport(tokens[0]);
+ broker.setHost(tokens[1]);
+ broker.setPort(Integer.parseInt(tokens[2]));
+ broker.setProperties(_orginalBrokerDetail.getProperties());
+ broker.setSSLConfiguration(_orginalBrokerDetail.getSSLConfiguration());
+ brokerList.add(broker);
+ break;
+ }
+ }
+ }
+ }
+ catch(JMSException e)
+ {
+ _logger.error("Error parsing the message sent by failover exchange",e);
+ }
+
+ synchronized (_brokerListLock)
+ {
+ _connectionDetails.setBrokerDetails(brokerList);
+ }
+ }
+
+ public void attainedConnection()
+ {
+ super.attainedConnection();
+ try
+ {
+ subscribeForUpdates();
+ }
+ catch (JMSException e)
+ {
+ throw new RuntimeException("Unable to subscribe for cluster membership updates",e);
+ }
+ }
+
+ public BrokerDetails getCurrentBrokerDetails()
+ {
+ synchronized (_brokerListLock)
+ {
+ return super.getCurrentBrokerDetails();
+ }
+ }
+
+ public BrokerDetails getNextBrokerDetails()
+ {
+ synchronized(_brokerListLock)
+ {
+ return super.getNextBrokerDetails();
+ }
+ }
+
+ public String methodName()
+ {
+ return "Failover Exchange";
+ }
+
+ public String toString()
+ {
+ StringBuffer sb = new StringBuffer();
+ sb.append("FailoverExchange:\n");
+ sb.append(super.toString());
+ return sb.toString();
+ }
+}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverMethod.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverMethod.java
index d7ec46dea3..a921649829 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverMethod.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverMethod.java
@@ -27,6 +27,7 @@ public interface FailoverMethod
{
public static final String SINGLE_BROKER = "singlebroker";
public static final String ROUND_ROBIN = "roundrobin";
+ public static final String FAILOVER_EXCHANGE= "failover_exchange";
public static final String RANDOM = "random";
/**
* Reset the Failover to initial conditions