summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2008-07-03 14:33:10 +0000
committerAidan Skinner <aidan@apache.org>2008-07-03 14:33:10 +0000
commit68ab8c644926c013554a6a5ea9c593bcc6462af0 (patch)
tree66d6c68c9ce3b2360ff1ae0bd8dcf8cea2ad3454
parent8296f73e2535f6ad8dcc9421955b5e832dd0aff6 (diff)
downloadqpid-python-68ab8c644926c013554a6a5ea9c593bcc6462af0.tar.gz
QPID-962 Exception handling was... unpleasing... Fix up of patch from rhs
AMQConnection.java: Refactor listener and stack exceptions in a list. Add get lastException, which can now be any Exception. Don't set connected, let the delegate decide. AMQConnectionDelegate_8_0.java, AMQConnectionDelete_0_10.java: set _connected to true if we suceed AMQProtocolHandler.java: attainState can now throw any sort of Exception AMQStateManager.java: attainState can now throw any Exception ConnectionTest.java: check that exception cause is not null AMQConnectionFailureException.java: Add ability to store a Collection of Exceptions in case there are multiple possible causes of the failure. Which there shouldn't be, but it can happen. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@673688 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java82
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java1
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java26
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java12
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java1
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java18
7 files changed, 85 insertions, 57 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 04f5a6d204..0abcc8ef26 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -26,6 +26,7 @@ import org.apache.qpid.AMQProtocolException;
import org.apache.qpid.AMQUnresolvedAddressException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.configuration.ClientProperties;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.*;
@@ -75,6 +76,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>();
private int _size = 0;
private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
+
public AMQSession get(int channelId)
{
@@ -232,11 +234,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
protected boolean _connected;
/*
- * The last error code that occured on the connection. Used to return the correct exception to the client
- */
- protected AMQException _lastAMQException = null;
-
- /*
* The connection meta data
*/
private QpidConnectionMetaData _connectionMetaData;
@@ -261,6 +258,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
//Indicates whether persistent messages are synchronized
private boolean _syncPersistence;
+
+ /** used to hold a list of all exceptions that have been thrown during connection construction. gross */
+ final ArrayList<Exception> _exceptions = new ArrayList<Exception>();
/**
* @param broker brokerdetails
@@ -378,13 +378,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_delegate = new AMQConnectionDelegate_0_10(this);
}
- final ArrayList<JMSException> exceptions = new ArrayList<JMSException>();
-
+
class Listener implements ExceptionListener
{
public void onException(JMSException e)
{
- exceptions.add(e);
+ _exceptions.add(e);
}
}
@@ -443,9 +442,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// We are not currently connected
_connected = false;
- Exception lastException = new Exception();
- lastException.initCause(new ConnectException());
-
// TMG FIXME this seems... wrong...
boolean retryAllowed = true;
while (!_connected && retryAllowed )
@@ -453,8 +449,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
try
{
makeBrokerConnection(brokerDetails);
- lastException = null;
- _connected = true;
}
catch (AMQProtocolException pe)
{
@@ -470,17 +464,29 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
catch (Exception e)
{
- lastException = e;
-
+ _exceptions.add(e);
if (_logger.isInfoEnabled())
{
- _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(),
- e.getCause());
+ _logger.info("Unable to connect to broker at " +
+ _failoverPolicy.getCurrentBrokerDetails(),
+ e);
}
+ }
+
+ if (!_connected)
+ {
retryAllowed = _failoverPolicy.failoverAllowed();
brokerDetails = _failoverPolicy.getNextBrokerDetails();
}
}
+ try
+ {
+ setExceptionListener(null);
+ }
+ catch (JMSException e1)
+ {
+ // Can't happen
+ }
if (_logger.isDebugEnabled())
{
@@ -498,24 +504,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
// Eat it, we've hopefully got all the exceptions if this happened
}
- if (exceptions.size() > 0)
- {
- JMSException e = exceptions.get(0);
- int code = -1;
- try
- {
- code = new Integer(e.getErrorCode()).intValue();
- }
- catch (NumberFormatException nfe)
- {
- // Ignore this, we have some error codes and messages swapped around
- }
-
- throw new AMQConnectionFailureException(AMQConstant.getConstant(code),
- e.getMessage(), e);
- }
- else if (lastException != null)
+
+ Exception lastException = null;
+ if (_exceptions.size() > 0)
{
+ lastException = _exceptions.get(_exceptions.size() - 1);
if (lastException.getCause() != null)
{
message = lastException.getCause().getMessage();
@@ -538,8 +531,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
- AMQException e = new AMQConnectionFailureException(message, null);
-
+ AMQException e = new AMQConnectionFailureException(message, _exceptions);
+
if (lastException != null)
{
if (lastException instanceof UnresolvedAddressException)
@@ -547,13 +540,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
e = new AMQUnresolvedAddressException(message, _failoverPolicy.getCurrentBrokerDetails().toString(),
null);
}
-
- if (e.getCause() != null)
- {
- e.initCause(lastException);
- }
+
}
-
throw e;
}
@@ -1507,4 +1495,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
return _syncPersistence;
}
+
+ public Exception getLastException()
+ {
+ if (_exceptions.size() > 0)
+ {
+ return _exceptions.get(_exceptions.size() - 1);
+ }
+ return null;
+ }
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 61c06df7a5..825a52c5cb 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -115,6 +115,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed
_qpidConnection.connect(brokerDetail.getHost(), brokerDetail.getPort(), _conn.getVirtualHost(),
_conn.getUsername(), _conn.getPassword());
_qpidConnection.setClosedListener(this);
+ _conn._connected = true;
}
catch(ProtocolException pe)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index b5b28e0b28..5074658070 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -25,11 +25,14 @@ import java.net.ConnectException;
import java.nio.channels.UnresolvedAddressException;
import java.text.MessageFormat;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.Iterator;
+import java.util.Set;
import javax.jms.JMSException;
import javax.jms.XASession;
+import org.apache.qpid.AMQConnectionFailureException;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
@@ -76,24 +79,21 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException));
}
- public void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
+ public void makeBrokerConnection(BrokerDetails brokerDetail) throws AMQException, IOException
{
- try
+ final Set<AMQState> openOrClosedStates =
+ EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED);
+
+ TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail);
+ // this blocks until the connection has been set up or when an error
+ // has prevented the connection being set up
+
+ AMQState state = _conn._protocolHandler.attainState(openOrClosedStates);
+ if(state == AMQState.CONNECTION_OPEN)
{
- TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail);
- // this blocks until the connection has been set up or when an error
- // has prevented the connection being set up
- _conn._protocolHandler.attainState(AMQState.CONNECTION_OPEN);
_conn._failoverPolicy.attainedConnection();
-
- // Again this should be changed to a suitable notify
_conn._connected = true;
}
- catch (AMQException e)
- {
- _conn._lastAMQException = e;
- throw e;
- }
}
public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch)
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 2d8074eea2..1b75d6e829 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -559,7 +559,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
_frameListeners.remove(listener);
}
*/
- public void attainState(AMQState s) throws AMQException
+ public void attainState(AMQState s) throws Exception
{
getStateManager().attainState(s);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index eda1a1f5fd..21f190bd7e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -102,7 +102,7 @@ public class AMQStateManager
}
- public void attainState(final AMQState s) throws AMQException
+ public void attainState(final AMQState s) throws Exception
{
synchronized (_stateLock)
{
@@ -118,6 +118,11 @@ public class AMQStateManager
catch (InterruptedException e)
{
_logger.warn("Thread interrupted");
+ if (_protocolSession.getAMQConnection().getLastException() != null)
+ {
+ throw _protocolSession.getAMQConnection().getLastException();
+ }
+
}
if (_currentState != s)
@@ -169,6 +174,11 @@ public class AMQStateManager
catch (InterruptedException e)
{
_logger.warn("Thread interrupted");
+ if (_protocolSession.getAMQConnection().getLastException() != null)
+ {
+ throw new AMQException(null, "Could not attain state due to exception",
+ _protocolSession.getAMQConnection().getLastException());
+ }
}
if (!stateSet.contains(_currentState))
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
index f856e8c20b..97eed08ab1 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
@@ -134,6 +134,7 @@ public class ConnectionTest extends TestCase
}
catch (AMQException amqe)
{
+ assertNotNull("No cause set", amqe.getCause());
if (amqe.getCause().getClass() == Exception.class)
{
System.err.println("QPID-594 : WARNING RACE CONDITION. Unable to determine cause of Connection Failure.");
diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java
index 6cdd57d6f2..fa69f7f91b 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java
@@ -21,6 +21,10 @@
package org.apache.qpid;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+
import org.apache.qpid.protocol.AMQConstant;
/**
@@ -35,6 +39,8 @@ import org.apache.qpid.protocol.AMQConstant;
*/
public class AMQConnectionFailureException extends AMQException
{
+ Collection<Exception> _exceptions;
+
public AMQConnectionFailureException(String message, Throwable cause)
{
super(null, message, cause);
@@ -44,4 +50,16 @@ public class AMQConnectionFailureException extends AMQException
{
super(errorCode, message, cause);
}
+
+ public AMQConnectionFailureException(String message, Collection<Exception> exceptions)
+ {
+ // Blah, I hate ? but java won't let super() be anything other than the first thing, sorry...
+ super (null, message, exceptions.isEmpty() ? null : exceptions.iterator().next());
+ this._exceptions = exceptions;
+ }
+
+ public Collection<Exception> getLinkedExceptions()
+ {
+ return _exceptions;
+ }
}