diff options
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQConnection.java | 49 |
1 files changed, 18 insertions, 31 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index cc052f81df..50299fa9d5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -215,12 +215,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect Exception lastException = new Exception(); lastException.initCause(new ConnectException()); - while (lastException != null && checkException(lastException) && _failoverPolicy.failoverAllowed()) + while (!_connected && _failoverPolicy.failoverAllowed()) { try { makeBrokerConnection(_failoverPolicy.getNextBrokerDetails()); lastException = null; + _connected = true; } catch (Exception e) { @@ -232,34 +233,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _logger.debug("Are we connected:" + _connected); - // Then the Failover Thread will handle conneciton - if (_failoverPolicy.failoverAllowed()) - { - //TODO this needs to be redone so that we are not spinning. - // A suitable object should be set that is then waited on - // and only notified when a connection is made or when - // the AMQConnection gets closed. - while (!_connected && !_closed.get()) - { - try - { - _logger.debug("Sleeping."); - Thread.sleep(100); - } - catch (InterruptedException ie) - { - _logger.debug("Woken up."); - } - } - if (!_failoverPolicy.failoverAllowed() || _failoverPolicy.getCurrentBrokerDetails() == null) - { - if (_lastAMQException != null) - { - throw _lastAMQException; - } - } - } - else + if (!_connected) { String message = null; @@ -318,7 +292,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private void setVirtualHost(String virtualHost) { - if(virtualHost.startsWith("/")) + if (virtualHost.startsWith("/")) { virtualHost = virtualHost.substring(1); } @@ -403,7 +377,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public boolean failoverAllowed() { - return _failoverPolicy.failoverAllowed(); + if (!_connected) + { + return false; + } + else + { + return _failoverPolicy.failoverAllowed(); + } } public Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException @@ -815,6 +796,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _protocolHandler; } + public boolean started() + { + return _started; + } + public void bytesSent(long writtenBytes) { if (_connectionListener != null) @@ -1031,4 +1017,5 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect AMQConnectionFactory.class.getName(), null); // factory location } + } |