summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2006-09-25 10:50:01 +0000
committerMartin Ritchie <ritchiem@apache.org>2006-09-25 10:50:01 +0000
commit4773115f9b37cc7aea6278965e49088da72eb279 (patch)
treeeb2dcb355ef819bf769874d85bdf9fa9af92e964 /java/client/src
parent938c3e0d61c463461c716ad5d3515a5b927842d3 (diff)
downloadqpid-python-4773115f9b37cc7aea6278965e49088da72eb279.tar.gz
AMQBrokerDetails.java - Changed default SSL value to use a static constant
AMQConnection.java - Fixed initial constructor connection, exception handling to check exception or cause for ConnectionExceptions and UnresolvedAddresses. AMQProtocolHandler.java - removed //todo as auth failure handling is now done. Added new transport exceptions AMQNoTransportForProtocolException.java and AMQTransportConnectionException.java as part of the TransportConnection changes. TransportConnection.java - Major changes to use the transport specified in the BrokerDetails object to dictate the type of Transport used for the connection. Currently only tcp and vm are supported. VmPipeTransportConnection.java - moved from test classes. Created vmbroker package for VMBrokerCreateException. Allows client to distinguish between a connection failure and a problem starting the broker to connect to it. BrokerDetails.java - Added default transport and set the current transport string values, 'tcp' and 'vm' FailoverMethod.java - Added singlebroker for completeness. FailoverPolicy.java - Allowed method type singlebroker to be created. Changed else if to else { if git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@449640 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/org/apache/qpid/client/AMQBrokerDetails.java2
-rw-r--r--java/client/src/org/apache/qpid/client/AMQConnection.java25
-rw-r--r--java/client/src/org/apache/qpid/client/protocol/AMQProtocolHandler.java1
-rw-r--r--java/client/src/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java50
-rw-r--r--java/client/src/org/apache/qpid/client/transport/AMQTransportConnectionException.java29
-rw-r--r--java/client/src/org/apache/qpid/client/transport/TransportConnection.java229
-rw-r--r--java/client/src/org/apache/qpid/client/transport/VmPipeTransportConnection.java65
-rw-r--r--java/client/src/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java41
-rw-r--r--java/client/src/org/apache/qpid/jms/BrokerDetails.java17
-rw-r--r--java/client/src/org/apache/qpid/jms/FailoverPolicy.java42
-rw-r--r--java/client/src/org/apache/qpid/jms/failover/FailoverMethod.java1
11 files changed, 454 insertions, 48 deletions
diff --git a/java/client/src/org/apache/qpid/client/AMQBrokerDetails.java b/java/client/src/org/apache/qpid/client/AMQBrokerDetails.java
index 52de858b13..43e1672564 100644
--- a/java/client/src/org/apache/qpid/client/AMQBrokerDetails.java
+++ b/java/client/src/org/apache/qpid/client/AMQBrokerDetails.java
@@ -289,7 +289,7 @@ public class AMQBrokerDetails implements BrokerDetails
return _options.get(OPTIONS_SSL).equalsIgnoreCase("true");
}
- return false;
+ return USE_SSL_DEFAULT;
}
public void useSSL(boolean ssl)
diff --git a/java/client/src/org/apache/qpid/client/AMQConnection.java b/java/client/src/org/apache/qpid/client/AMQConnection.java
index 3a4a6fd5d9..f8bea185d2 100644
--- a/java/client/src/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/org/apache/qpid/client/AMQConnection.java
@@ -186,7 +186,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
Exception lastException = new Exception();
lastException.initCause(new ConnectException());
- while (lastException != null && lastException.getCause() instanceof ConnectException && _failoverPolicy.failoverAllowed())
+ while (lastException != null && checkException(lastException) && _failoverPolicy.failoverAllowed())
{
try
{
@@ -198,8 +198,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
lastException = e;
_logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e.getCause());
- _logger.info(e);
- _logger.info(e.getCause());
}
}
@@ -259,7 +257,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
if (lastException instanceof UnresolvedAddressException)
{
- e = new AMQUnresolvedAddressException(message);
+ e = new AMQUnresolvedAddressException(message, _failoverPolicy.getCurrentBrokerDetails().toString());
}
e.initCause(lastException);
}
@@ -268,6 +266,18 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
+ protected boolean checkException(Throwable thrown)
+ {
+ Throwable cause = thrown.getCause();
+
+ if (cause == null)
+ {
+ cause = thrown;
+ }
+
+ return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException));
+ }
+
protected AMQConnection(String username, String password, String clientName, String virtualHost)
{
_clientName = clientName;
@@ -280,7 +290,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
try
{
- TransportConnection.getInstance().connect(_protocolHandler, brokerDetail);
+ TransportConnection.getInstance(brokerDetail).connect(_protocolHandler, brokerDetail);
// this blocks until the connection has been set up or when an error
// has prevented the connection being set up
_protocolHandler.attainState(AMQState.CONNECTION_OPEN);
@@ -387,7 +397,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// open it, so that there is no window where we could receive data on the channel and not be set
// up to handle it appropriately.
AMQSession session = new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode,
- prefetch);
+ prefetch);
_protocolHandler.addSessionByChannel(channelId, session);
registerSession(channelId, session);
@@ -405,7 +415,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
finally
{
- if (!success) {
+ if (!success)
+ {
_protocolHandler.removeSessionByChannel(channelId);
deregisterSession(channelId);
}
diff --git a/java/client/src/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 0c4dbcbb26..0bfafa92b4 100644
--- a/java/client/src/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -157,7 +157,6 @@ public class AMQProtocolHandler extends IoHandlerAdapter
*/
public void sessionClosed(IoSession session) throws Exception
{
- //todo server just closes session with no warning if auth fails.
if (_connection.isClosed())
{
_logger.info("Session closed called by client");
diff --git a/java/client/src/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java b/java/client/src/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java
new file mode 100644
index 0000000000..6faed9bb5f
--- /dev/null
+++ b/java/client/src/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.transport;
+
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.AMQException;
+
+public class AMQNoTransportForProtocolException extends AMQTransportConnectionException
+{
+ BrokerDetails _details;
+
+ public AMQNoTransportForProtocolException(BrokerDetails details)
+ {
+ this(details, "No Transport exists for specified broker protocol");
+ }
+
+ public AMQNoTransportForProtocolException(BrokerDetails details, String message)
+ {
+ super(message);
+
+ _details = details;
+ }
+
+ public String toString()
+ {
+ if (_details != null)
+ {
+ return super.toString() + _details.toString();
+ }
+ else
+ {
+ return super.toString();
+ }
+ }
+}
diff --git a/java/client/src/org/apache/qpid/client/transport/AMQTransportConnectionException.java b/java/client/src/org/apache/qpid/client/transport/AMQTransportConnectionException.java
new file mode 100644
index 0000000000..a40cea7807
--- /dev/null
+++ b/java/client/src/org/apache/qpid/client/transport/AMQTransportConnectionException.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.transport;
+
+import org.apache.qpid.AMQException;
+
+public class AMQTransportConnectionException extends AMQException
+{
+ public AMQTransportConnectionException(String message)
+ {
+ super(message);
+
+ }
+}
diff --git a/java/client/src/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/org/apache/qpid/client/transport/TransportConnection.java
index a898e182f7..6894aa25a8 100644
--- a/java/client/src/org/apache/qpid/client/transport/TransportConnection.java
+++ b/java/client/src/org/apache/qpid/client/transport/TransportConnection.java
@@ -17,14 +17,29 @@
*/
package org.apache.qpid.client.transport;
+import org.apache.log4j.Logger;
import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.transport.socket.nio.SocketConnector;
+import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
+import org.apache.mina.transport.vmpipe.VmPipeAddress;
+import org.apache.qpid.client.AMQBrokerDetails;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.transport.VmPipeTransportConnection;
+import org.apache.qpid.vmbroker.AMQVMBrokerCreationException;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
/**
* The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up
* the underlying connector, which currently always uses TCP/IP sockets. It creates the
* "protocol handler" which deals with MINA protocol events.
- *
+ * <p/>
* Could be extended in future to support different transport types by turning this into concrete class/interface
* combo.
*/
@@ -32,40 +47,214 @@ public class TransportConnection
{
private static ITransportConnection _instance;
+ private static Map _inVmPipeAddress = new HashMap();
+ private static VmPipeAcceptor _acceptor;
+ private static int _currentInstance = -1;
+ private static int _currentVMPort = -1;
+
+ private static final int TCP = 0;
+ private static final int VM = 1;
+
+ private static Logger _logger = Logger.getLogger(TransportConnection.class);
+
+ private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQPFastProtocolHandler";
+
static
{
- if (Boolean.getBoolean("amqj.useBlockingIo"))
+ _acceptor = new VmPipeAcceptor();
+
+ IoServiceConfig config = _acceptor.getDefaultConfig();
+
+ config.setThreadModel(new ReadWriteThreadModel());
+ }
+
+ public static ITransportConnection getInstance() throws AMQTransportConnectionException
+ {
+ AMQBrokerDetails details = new AMQBrokerDetails();
+ details.setTransport(BrokerDetails.TCP);
+ return getInstance(details);
+ }
+
+ public static ITransportConnection getInstance(BrokerDetails details) throws AMQTransportConnectionException
+ {
+ int transport = getTransport(details.getTransport());
+
+ if (transport == -1)
+
{
- _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() {
- public IoConnector newSocketConnector() {
- return new org.apache.qpid.bio.SocketConnector(); // blocking connector
+ throw new AMQNoTransportForProtocolException(details);
+ }
+
+ if (transport == _currentInstance)
+
+ {
+ if (transport == VM)
+ {
+ if (_currentVMPort == details.getPort())
+ {
+ return _instance;
}
- });
+ }
+ else
+ {
+ return _instance;
+ }
}
- else
+
+ _currentInstance = transport;
+
+ switch (transport)
+
{
- _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() {
- public IoConnector newSocketConnector() {
- SocketConnector result = new SocketConnector(); // non-blocking connector
+ case TCP:
+ if (Boolean.getBoolean("amqj.useBlockingIo"))
+ {
+ _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
+ {
+ public IoConnector newSocketConnector()
+ {
+ return new org.apache.qpid.bio.SocketConnector(); // blocking connector
+ }
+ });
+ }
+ else
+ {
+ _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
+ {
+ public IoConnector newSocketConnector()
+ {
+ SocketConnector result = new SocketConnector(); // non-blocking connector
+
+ // Don't have the connector's worker thread wait around for other connections (we only use
+ // one SocketConnector per connection at the moment anyway). This allows short-running
+ // clients (like unit tests) to complete quickly.
+ result.setWorkerTimeout(0L);
- // Don't have the connector's worker thread wait around for other connections (we only use
- // one SocketConnector per connection at the moment anyway). This allows short-running
- // clients (like unit tests) to complete quickly.
- result.setWorkerTimeout(0L);
+ return result;
+ }
+ });
- return result;
}
- });
+ break;
+ case VM:
+ {
+ _instance = getVMTransport(details, Boolean.getBoolean("amqj.NoAutoCreateVMBroker"));
+ break;
+ }
}
+
+ return _instance;
}
- public static void setInstance(ITransportConnection transport)
+ private static int getTransport(String transport)
{
- _instance = transport;
+ if (transport.equals(BrokerDetails.TCP))
+ {
+ return TCP;
+ }
+
+ if (transport.equals(BrokerDetails.VM))
+ {
+ return VM;
+ }
+
+ return -1;
}
- public static ITransportConnection getInstance()
+ private static ITransportConnection getVMTransport(BrokerDetails details, boolean noAutoCreate) throws AMQVMBrokerCreationException
{
- return _instance;
+ int port = details.getPort();
+
+ if (!_inVmPipeAddress.containsKey(port))
+ {
+ if (noAutoCreate)
+ {
+ throw new AMQVMBrokerCreationException(port, "VM Broker on port " + port + " does not exist. Auto create disabled.");
+ }
+ else
+ {
+ createVMBroker(port);
+ }
+ }
+
+ return new VmPipeTransportConnection(port);
+ }
+
+
+ public static void createVMBroker(int port) throws AMQVMBrokerCreationException
+ {
+
+
+ if (!_inVmPipeAddress.containsKey(port))
+ {
+ _logger.info("Creating InVM Qpid.AMQP listening on port " + port);
+
+ try
+ {
+ VmPipeAddress pipe = new VmPipeAddress(port);
+
+ String protocolProviderClass = System.getProperty("amqj.protocolprovider.class", DEFAULT_QPID_SERVER);
+ _logger.info("Creating Qpid protocol provider: " + protocolProviderClass);
+
+ // can't use introspection to get Provider as it is a server class.
+ // need to go straight to IoHandlerAdapter but that requries the queues and exchange from the ApplicationRegistry which we can't access.
+
+ //get right constructor and pass in instancec ID - "port"
+ IoHandlerAdapter provider;
+ try
+ {
+ Class[] cnstr = {Integer.class};
+ Object[] params = {port};
+ provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params);
+ }
+ catch (Exception e)
+ {
+ _logger.info("Unable to create InVM Qpid.AMQP on port " + port);
+ _logger.info(e);
+ throw new AMQVMBrokerCreationException(port, "Unable to create InVM Qpid.AMQP on port " + port);
+ }
+
+ _acceptor.bind(pipe, provider);
+
+ _inVmPipeAddress.put(port, pipe);
+ _logger.info("Created InVM Qpid.AMQP listening on port " + port);
+ }
+ catch (IOException e)
+ {
+ throw new AMQVMBrokerCreationException(port, "Unable to create InVM Qpid.AMQP on port " + port);
+ }
+ }
+ else
+ {
+ _logger.info("InVM Qpid.AMQP on port " + port + " already exits.");
+ }
+
+ }
+
+ public static void killAllVMBrokers()
+ {
+ _logger.info("Killing all VM Brokers");
+ _acceptor.unbindAll();
+
+ Iterator keys = _inVmPipeAddress.keySet().iterator();
+
+ while (keys.hasNext())
+ {
+ int id = (Integer) keys.next();
+ _inVmPipeAddress.remove(id);
+ }
+
}
+
+ public static void killVMBroker(int port)
+ {
+ VmPipeAddress pipe = (VmPipeAddress) _inVmPipeAddress.get(port);
+ if (pipe != null)
+ {
+ _logger.info("Killing VM Broker:" + port);
+ _acceptor.unbind(pipe);
+ _inVmPipeAddress.remove(port);
+ }
+ }
+
}
diff --git a/java/client/src/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/java/client/src/org/apache/qpid/client/transport/VmPipeTransportConnection.java
new file mode 100644
index 0000000000..decfbd57e0
--- /dev/null
+++ b/java/client/src/org/apache/qpid/client/transport/VmPipeTransportConnection.java
@@ -0,0 +1,65 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.transport.ITransportConnection;
+import org.apache.qpid.pool.PoolingFilter;
+import org.apache.qpid.pool.ReferenceCountingExecutorService;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.transport.vmpipe.VmPipeAddress;
+import org.apache.mina.transport.vmpipe.VmPipeConnector;
+
+import java.io.IOException;
+
+public class VmPipeTransportConnection implements ITransportConnection
+{
+ private static final Logger _logger = Logger.getLogger(VmPipeTransportConnection.class);
+
+ private static int _port;
+
+ public VmPipeTransportConnection(int port)
+ {
+ _port = port;
+ }
+
+ public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException
+ {
+ final VmPipeConnector ioConnector = new VmPipeConnector();
+ final IoServiceConfig cfg = ioConnector.getDefaultConfig();
+ ReferenceCountingExecutorService executorService = ReferenceCountingExecutorService.getInstance();
+ PoolingFilter asyncRead = new PoolingFilter(executorService, PoolingFilter.READ_EVENTS,
+ "AsynchronousReadFilter");
+ cfg.getFilterChain().addFirst("AsynchronousReadFilter", asyncRead);
+ PoolingFilter asyncWrite = new PoolingFilter(executorService, PoolingFilter.WRITE_EVENTS,
+ "AsynchronousWriteFilter");
+ cfg.getFilterChain().addLast("AsynchronousWriteFilter", asyncWrite);
+
+ final VmPipeAddress address = new VmPipeAddress(_port);
+ _logger.info("Attempting connection to " + address);
+ ConnectFuture future = ioConnector.connect(address, protocolHandler);
+ // wait for connection to complete
+ future.join();
+ // we call getSession which throws an IOException if there has been an error connecting
+ future.getSession();
+ }
+}
diff --git a/java/client/src/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java b/java/client/src/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java
new file mode 100644
index 0000000000..6ff91ea6e3
--- /dev/null
+++ b/java/client/src/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.vmbroker;
+
+import org.apache.qpid.client.transport.AMQTransportConnectionException;
+
+public class AMQVMBrokerCreationException extends AMQTransportConnectionException
+{
+ private int _port;
+
+ public AMQVMBrokerCreationException(int port)
+ {
+ this(port, "Unable to create vm broker");
+ }
+
+ public AMQVMBrokerCreationException(int port, String message)
+ {
+ super(message);
+ _port = port;
+ }
+
+ public String toString()
+ {
+ return super.toString() + " on port " + _port;
+ }
+}
diff --git a/java/client/src/org/apache/qpid/jms/BrokerDetails.java b/java/client/src/org/apache/qpid/jms/BrokerDetails.java
index fc8af2091e..25efd261f9 100644
--- a/java/client/src/org/apache/qpid/jms/BrokerDetails.java
+++ b/java/client/src/org/apache/qpid/jms/BrokerDetails.java
@@ -28,29 +28,40 @@ public interface BrokerDetails
public static final String OPTIONS_SSL = ConnectionURL.OPTIONS_SSL;
public static final String OPTIONS_CONNECT_TIMEOUT = "connecttimeout";
public static final int DEFAULT_PORT = 5672;
- public static final String DEFAULT_TRANSPORT = "tcp";
+
+ public static final String TCP = "tcp";
+ public static final String VM = "vm";
+
+ public static final String DEFAULT_TRANSPORT = TCP;
public static final String URL_FORMAT_EXAMPLE =
- "<transport>://<hostname>[:<port Default=\""+DEFAULT_PORT+"\">][?<option>='<value>'[,<option>='<value>']]";
+ "<transport>://<hostname>[:<port Default=\"" + DEFAULT_PORT + "\">][?<option>='<value>'[,<option>='<value>']]";
public static final long DEFAULT_CONNECT_TIMEOUT = 30000L;
+ public static final boolean USE_SSL_DEFAULT = false;
String getHost();
+
void setHost(String host);
int getPort();
+
void setPort(int port);
String getTransport();
+
void setTransport(String transport);
boolean useSSL();
+
void useSSL(boolean ssl);
String getOption(String key);
- void setOption(String key,String value);
+
+ void setOption(String key, String value);
long getTimeout();
+
void setTimeout(long timeout);
String toString();
diff --git a/java/client/src/org/apache/qpid/jms/FailoverPolicy.java b/java/client/src/org/apache/qpid/jms/FailoverPolicy.java
index a1a89f8a66..8c0e982478 100644
--- a/java/client/src/org/apache/qpid/jms/FailoverPolicy.java
+++ b/java/client/src/org/apache/qpid/jms/FailoverPolicy.java
@@ -75,24 +75,31 @@ public class FailoverPolicy
//todo write a random connection Failover
}
*/
- if (failoverMethod.equals(FailoverMethod.ROUND_ROBIN))
+ if (failoverMethod.equals(FailoverMethod.SINGLE_BROKER))
{
method = new FailoverRoundRobinServers(connectionDetails);
}
else
{
- try
+ if (failoverMethod.equals(FailoverMethod.ROUND_ROBIN))
{
- Class[] constructorSpec = {ConnectionURL.class};
- Object [] params = {connectionDetails};
-
- method = (FailoverMethod) ClassLoader.getSystemClassLoader().
- loadClass(failoverMethod).
- getConstructor(constructorSpec).newInstance(params);
+ method = new FailoverRoundRobinServers(connectionDetails);
}
- catch (Exception cnfe)
+ else
{
- throw new IllegalArgumentException("Unknown failover method:" + failoverMethod);
+ try
+ {
+ Class[] constructorSpec = {ConnectionURL.class};
+ Object [] params = {connectionDetails};
+
+ method = (FailoverMethod) ClassLoader.getSystemClassLoader().
+ loadClass(failoverMethod).
+ getConstructor(constructorSpec).newInstance(params);
+ }
+ catch (Exception cnfe)
+ {
+ throw new IllegalArgumentException("Unknown failover method:" + failoverMethod);
+ }
}
}
}
@@ -149,14 +156,17 @@ public class FailoverPolicy
}
- else if ((now - _lastFailTime) >= DEFAULT_FAILOVER_TIMEOUT)
- {
- _logger.info("Failover timeout");
- return false;
- }
else
{
- _lastMethodTime = now;
+ if ((now - _lastFailTime) >= DEFAULT_FAILOVER_TIMEOUT)
+ {
+ _logger.info("Failover timeout");
+ return false;
+ }
+ else
+ {
+ _lastMethodTime = now;
+ }
}
}
else
diff --git a/java/client/src/org/apache/qpid/jms/failover/FailoverMethod.java b/java/client/src/org/apache/qpid/jms/failover/FailoverMethod.java
index ff1336eb9d..2e14348ad0 100644
--- a/java/client/src/org/apache/qpid/jms/failover/FailoverMethod.java
+++ b/java/client/src/org/apache/qpid/jms/failover/FailoverMethod.java
@@ -22,6 +22,7 @@ import org.apache.qpid.jms.BrokerDetails;
public interface FailoverMethod
{
+ public static final String SINGLE_BROKER = "singlebroker";
public static final String ROUND_ROBIN = "roundrobin";
public static final String RANDOM = "random";
/**