summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java49
1 files changed, 18 insertions, 31 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index cc052f81df..50299fa9d5 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -215,12 +215,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
Exception lastException = new Exception();
lastException.initCause(new ConnectException());
- while (lastException != null && checkException(lastException) && _failoverPolicy.failoverAllowed())
+ while (!_connected && _failoverPolicy.failoverAllowed())
{
try
{
makeBrokerConnection(_failoverPolicy.getNextBrokerDetails());
lastException = null;
+ _connected = true;
}
catch (Exception e)
{
@@ -232,34 +233,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_logger.debug("Are we connected:" + _connected);
- // Then the Failover Thread will handle conneciton
- if (_failoverPolicy.failoverAllowed())
- {
- //TODO this needs to be redone so that we are not spinning.
- // A suitable object should be set that is then waited on
- // and only notified when a connection is made or when
- // the AMQConnection gets closed.
- while (!_connected && !_closed.get())
- {
- try
- {
- _logger.debug("Sleeping.");
- Thread.sleep(100);
- }
- catch (InterruptedException ie)
- {
- _logger.debug("Woken up.");
- }
- }
- if (!_failoverPolicy.failoverAllowed() || _failoverPolicy.getCurrentBrokerDetails() == null)
- {
- if (_lastAMQException != null)
- {
- throw _lastAMQException;
- }
- }
- }
- else
+ if (!_connected)
{
String message = null;
@@ -318,7 +292,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private void setVirtualHost(String virtualHost)
{
- if(virtualHost.startsWith("/"))
+ if (virtualHost.startsWith("/"))
{
virtualHost = virtualHost.substring(1);
}
@@ -403,7 +377,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public boolean failoverAllowed()
{
- return _failoverPolicy.failoverAllowed();
+ if (!_connected)
+ {
+ return false;
+ }
+ else
+ {
+ return _failoverPolicy.failoverAllowed();
+ }
}
public Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException
@@ -815,6 +796,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return _protocolHandler;
}
+ public boolean started()
+ {
+ return _started;
+ }
+
public void bytesSent(long writtenBytes)
{
if (_connectionListener != null)
@@ -1031,4 +1017,5 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
AMQConnectionFactory.class.getName(),
null); // factory location
}
+
}