diff options
author | Aidan Skinner <aidan@apache.org> | 2008-07-03 14:33:10 +0000 |
---|---|---|
committer | Aidan Skinner <aidan@apache.org> | 2008-07-03 14:33:10 +0000 |
commit | 68ab8c644926c013554a6a5ea9c593bcc6462af0 (patch) | |
tree | 66d6c68c9ce3b2360ff1ae0bd8dcf8cea2ad3454 | |
parent | 8296f73e2535f6ad8dcc9421955b5e832dd0aff6 (diff) | |
download | qpid-python-68ab8c644926c013554a6a5ea9c593bcc6462af0.tar.gz |
QPID-962 Exception handling was... unpleasing... Fix up of patch from rhs
AMQConnection.java: Refactor listener and stack exceptions in a list. Add get lastException, which can now be any Exception. Don't set connected, let the delegate decide.
AMQConnectionDelegate_8_0.java, AMQConnectionDelete_0_10.java: set _connected to true if we suceed
AMQProtocolHandler.java: attainState can now throw any sort of Exception
AMQStateManager.java: attainState can now throw any Exception
ConnectionTest.java: check that exception cause is not null
AMQConnectionFailureException.java: Add ability to store a Collection of Exceptions in case there are multiple possible causes of the failure. Which there shouldn't be, but it can happen.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@673688 13f79535-47bb-0310-9956-ffa450edef68
7 files changed, 85 insertions, 57 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 04f5a6d204..0abcc8ef26 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -26,6 +26,7 @@ import org.apache.qpid.AMQProtocolException; import org.apache.qpid.AMQUnresolvedAddressException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.configuration.ClientProperties; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.*; @@ -75,6 +76,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>(); private int _size = 0; private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; + public AMQSession get(int channelId) { @@ -232,11 +234,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect protected boolean _connected; /* - * The last error code that occured on the connection. Used to return the correct exception to the client - */ - protected AMQException _lastAMQException = null; - - /* * The connection meta data */ private QpidConnectionMetaData _connectionMetaData; @@ -261,6 +258,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect //Indicates whether persistent messages are synchronized private boolean _syncPersistence; + + /** used to hold a list of all exceptions that have been thrown during connection construction. gross */ + final ArrayList<Exception> _exceptions = new ArrayList<Exception>(); /** * @param broker brokerdetails @@ -378,13 +378,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _delegate = new AMQConnectionDelegate_0_10(this); } - final ArrayList<JMSException> exceptions = new ArrayList<JMSException>(); - + class Listener implements ExceptionListener { public void onException(JMSException e) { - exceptions.add(e); + _exceptions.add(e); } } @@ -443,9 +442,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // We are not currently connected _connected = false; - Exception lastException = new Exception(); - lastException.initCause(new ConnectException()); - // TMG FIXME this seems... wrong... boolean retryAllowed = true; while (!_connected && retryAllowed ) @@ -453,8 +449,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect try { makeBrokerConnection(brokerDetails); - lastException = null; - _connected = true; } catch (AMQProtocolException pe) { @@ -470,17 +464,29 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } catch (Exception e) { - lastException = e; - + _exceptions.add(e); if (_logger.isInfoEnabled()) { - _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), - e.getCause()); + _logger.info("Unable to connect to broker at " + + _failoverPolicy.getCurrentBrokerDetails(), + e); } + } + + if (!_connected) + { retryAllowed = _failoverPolicy.failoverAllowed(); brokerDetails = _failoverPolicy.getNextBrokerDetails(); } } + try + { + setExceptionListener(null); + } + catch (JMSException e1) + { + // Can't happen + } if (_logger.isDebugEnabled()) { @@ -498,24 +504,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { // Eat it, we've hopefully got all the exceptions if this happened } - if (exceptions.size() > 0) - { - JMSException e = exceptions.get(0); - int code = -1; - try - { - code = new Integer(e.getErrorCode()).intValue(); - } - catch (NumberFormatException nfe) - { - // Ignore this, we have some error codes and messages swapped around - } - - throw new AMQConnectionFailureException(AMQConstant.getConstant(code), - e.getMessage(), e); - } - else if (lastException != null) + + Exception lastException = null; + if (_exceptions.size() > 0) { + lastException = _exceptions.get(_exceptions.size() - 1); if (lastException.getCause() != null) { message = lastException.getCause().getMessage(); @@ -538,8 +531,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - AMQException e = new AMQConnectionFailureException(message, null); - + AMQException e = new AMQConnectionFailureException(message, _exceptions); + if (lastException != null) { if (lastException instanceof UnresolvedAddressException) @@ -547,13 +540,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect e = new AMQUnresolvedAddressException(message, _failoverPolicy.getCurrentBrokerDetails().toString(), null); } - - if (e.getCause() != null) - { - e.initCause(lastException); - } + } - throw e; } @@ -1507,4 +1495,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _syncPersistence; } + + public Exception getLastException() + { + if (_exceptions.size() > 0) + { + return _exceptions.get(_exceptions.size() - 1); + } + return null; + } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 61c06df7a5..825a52c5cb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -115,6 +115,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed _qpidConnection.connect(brokerDetail.getHost(), brokerDetail.getPort(), _conn.getVirtualHost(), _conn.getUsername(), _conn.getPassword()); _qpidConnection.setClosedListener(this); + _conn._connected = true; } catch(ProtocolException pe) { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index b5b28e0b28..5074658070 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -25,11 +25,14 @@ import java.net.ConnectException; import java.nio.channels.UnresolvedAddressException; import java.text.MessageFormat; import java.util.ArrayList; +import java.util.EnumSet; import java.util.Iterator; +import java.util.Set; import javax.jms.JMSException; import javax.jms.XASession; +import org.apache.qpid.AMQConnectionFailureException; import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; @@ -76,24 +79,21 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException)); } - public void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException + public void makeBrokerConnection(BrokerDetails brokerDetail) throws AMQException, IOException { - try + final Set<AMQState> openOrClosedStates = + EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED); + + TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail); + // this blocks until the connection has been set up or when an error + // has prevented the connection being set up + + AMQState state = _conn._protocolHandler.attainState(openOrClosedStates); + if(state == AMQState.CONNECTION_OPEN) { - TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail); - // this blocks until the connection has been set up or when an error - // has prevented the connection being set up - _conn._protocolHandler.attainState(AMQState.CONNECTION_OPEN); _conn._failoverPolicy.attainedConnection(); - - // Again this should be changed to a suitable notify _conn._connected = true; } - catch (AMQException e) - { - _conn._lastAMQException = e; - throw e; - } } public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch) diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 2d8074eea2..1b75d6e829 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -559,7 +559,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter _frameListeners.remove(listener); } */ - public void attainState(AMQState s) throws AMQException + public void attainState(AMQState s) throws Exception { getStateManager().attainState(s); } diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index eda1a1f5fd..21f190bd7e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -102,7 +102,7 @@ public class AMQStateManager } - public void attainState(final AMQState s) throws AMQException + public void attainState(final AMQState s) throws Exception { synchronized (_stateLock) { @@ -118,6 +118,11 @@ public class AMQStateManager catch (InterruptedException e) { _logger.warn("Thread interrupted"); + if (_protocolSession.getAMQConnection().getLastException() != null) + { + throw _protocolSession.getAMQConnection().getLastException(); + } + } if (_currentState != s) @@ -169,6 +174,11 @@ public class AMQStateManager catch (InterruptedException e) { _logger.warn("Thread interrupted"); + if (_protocolSession.getAMQConnection().getLastException() != null) + { + throw new AMQException(null, "Could not attain state due to exception", + _protocolSession.getAMQConnection().getLastException()); + } } if (!stateSet.contains(_currentState)) diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java index f856e8c20b..97eed08ab1 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java @@ -134,6 +134,7 @@ public class ConnectionTest extends TestCase } catch (AMQException amqe) { + assertNotNull("No cause set", amqe.getCause()); if (amqe.getCause().getClass() == Exception.class) { System.err.println("QPID-594 : WARNING RACE CONDITION. Unable to determine cause of Connection Failure."); diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java index 6cdd57d6f2..fa69f7f91b 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java @@ -21,6 +21,10 @@ package org.apache.qpid; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; + import org.apache.qpid.protocol.AMQConstant; /** @@ -35,6 +39,8 @@ import org.apache.qpid.protocol.AMQConstant; */ public class AMQConnectionFailureException extends AMQException { + Collection<Exception> _exceptions; + public AMQConnectionFailureException(String message, Throwable cause) { super(null, message, cause); @@ -44,4 +50,16 @@ public class AMQConnectionFailureException extends AMQException { super(errorCode, message, cause); } + + public AMQConnectionFailureException(String message, Collection<Exception> exceptions) + { + // Blah, I hate ? but java won't let super() be anything other than the first thing, sorry... + super (null, message, exceptions.isEmpty() ? null : exceptions.iterator().next()); + this._exceptions = exceptions; + } + + public Collection<Exception> getLinkedExceptions() + { + return _exceptions; + } } |