summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2011-09-30 14:56:57 +0000
committerKeith Wall <kwall@apache.org>2011-09-30 14:56:57 +0000
commitaeccf26719f22057b385f9ef3827614fd080913b (patch)
treeb050b2f0e65836404f0384782e8b99cebe22aaa8
parent5a7130b942767066a1256241a1657928b0dc34f7 (diff)
downloadqpid-python-aeccf26719f22057b385f9ef3827614fd080913b.tar.gz
QPID-3512: Avoid race during 0-8..0-9-1 connection close.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1177689 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java26
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java16
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java16
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java18
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java17
5 files changed, 59 insertions, 34 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 92f9ebe07c..b1a22155d6 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -36,11 +36,13 @@ import javax.jms.XASession;
import javax.net.ssl.SSLContext;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.failover.FailoverRetrySupport;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateWaiter;
import org.apache.qpid.framing.BasicQosBody;
import org.apache.qpid.framing.BasicQosOkBody;
@@ -69,8 +71,30 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
public void closeConnection(long timeout) throws JMSException, AMQException
{
- _conn.getProtocolHandler().closeConnection(timeout);
+ final AMQStateManager stateManager = _conn.getProtocolHandler().getStateManager();
+ final AMQState currentState = stateManager.getCurrentState();
+ if (currentState.equals(AMQState.CONNECTION_CLOSED))
+ {
+ _logger.debug("Connection already closed.");
+ }
+ else if (currentState.equals(AMQState.CONNECTION_CLOSING))
+ {
+ _logger.debug("Connection already closing, awaiting closed state.");
+ final StateWaiter closeWaiter = new StateWaiter(stateManager, currentState, EnumSet.of(AMQState.CONNECTION_CLOSED));
+ try
+ {
+ closeWaiter.await(timeout);
+ }
+ catch (AMQTimeoutException te)
+ {
+ throw new AMQTimeoutException("Close did not complete in timely fashion", te);
+ }
+ }
+ else
+ {
+ _conn.getProtocolHandler().closeConnection(timeout);
+ }
}
public AMQConnectionDelegate_8_0(AMQConnection conn)
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index de21b874e8..284954edba 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -665,22 +665,21 @@ public class AMQProtocolHandler implements ProtocolEngine
* <p/>If a failover exception occurs whilst closing the connection it is ignored, as the connection is closed
* anyway.
*
- * @param timeout The timeout to wait for an acknowledgement to the close request.
+ * @param timeout The timeout to wait for an acknowledgment to the close request.
*
* @throws AMQException If the close fails for any reason.
*/
public void closeConnection(long timeout) throws AMQException
{
- ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- new AMQShortString("JMS client is closing the connection."), 0, 0);
-
- final AMQFrame frame = body.generateFrame(0);
-
- //If the connection is already closed then don't do a syncWrite
if (!getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED))
{
+ // Connection is already closed then don't do a syncWrite
try
{
+ final ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ new AMQShortString("JMS client is closing the connection."), 0, 0);
+ final AMQFrame frame = body.generateFrame(0);
+
syncWrite(frame, ConnectionCloseOkBody.class, timeout);
_network.close();
closed();
@@ -691,10 +690,9 @@ public class AMQProtocolHandler implements ProtocolEngine
}
catch (FailoverException e)
{
- _logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway.");
+ _logger.debug("FailoverException interrupted connection close, ignoring as connection closed anyway.");
}
}
-
}
/** @return the number of bytes read from this protocol session */
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index 9c7d62670c..0d6fc727c1 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory;
import java.util.Set;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.io.IOException;
/**
* The state manager is responsible for managing the state of the protocol session. <p/>
@@ -48,7 +47,7 @@ import java.io.IOException;
*
* The two step process is required as there is an inherit race condition between starting a process that will cause
* the state to change and then attempting to wait for that change. The interest in the change must be first set up so
- * that any asynchrous errors that occur can be delivered to the correct waiters.
+ * that any asynchronous errors that occur can be delivered to the correct waiters.
*/
public class AMQStateManager implements AMQMethodListener
{
@@ -84,7 +83,10 @@ public class AMQStateManager implements AMQMethodListener
public AMQState getCurrentState()
{
- return _currentState;
+ synchronized (_stateLock)
+ {
+ return _currentState;
+ }
}
public void changeState(AMQState newState)
@@ -114,7 +116,7 @@ public class AMQStateManager implements AMQMethodListener
}
/**
- * Setting of the ProtocolSession will be required when Failover has been successfuly compeleted.
+ * Setting of the ProtocolSession will be required when Failover has been successfully completed.
*
* The new {@link AMQProtocolSession} that has been re-established needs to be provided as that is now the
* connection to the network.
@@ -131,9 +133,9 @@ public class AMQStateManager implements AMQMethodListener
}
/**
- * Propogate error to waiters
+ * Propagate error to waiters
*
- * @param error The error to propogate.
+ * @param error The error to propagate.
*/
public void error(Exception error)
{
@@ -177,7 +179,7 @@ public class AMQStateManager implements AMQMethodListener
}
/**
- * Create and add a new waiter to the notifcation list.
+ * Create and add a new waiter to the notification list.
*
* @param states The waiter will attempt to wait for one of these desired set states to be achived.
*
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java b/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
index 79f438d35d..732480e1c9 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
@@ -34,7 +34,7 @@ import java.util.Set;
*
* On construction the current state and a set of States to await for is provided.
*
- * When await() is called the state at constuction is compared against the awaitStates. If the state at construction is
+ * When await() is called the state at construction is compared against the awaitStates. If the state at construction is
* a desired state then await() returns immediately.
*
* Otherwise it will block for the set timeout for a desired state to be achieved.
@@ -48,9 +48,9 @@ public class StateWaiter extends BlockingWaiter<AMQState>
{
private static final Logger _logger = LoggerFactory.getLogger(StateWaiter.class);
- Set<AMQState> _awaitStates;
- private AMQState _startState;
- private AMQStateManager _stateManager;
+ private final Set<AMQState> _awaitStates;
+ private final AMQState _startState;
+ private final AMQStateManager _stateManager;
/**
*
@@ -78,9 +78,9 @@ public class StateWaiter extends BlockingWaiter<AMQState>
}
/**
- * Await for the requried State to be achieved within the default timeout.
+ * Await for the required State to be achieved within the default timeout.
* @return The achieved state that was requested.
- * @throws AMQException The exception that prevented the required state from being achived.
+ * @throws AMQException The exception that prevented the required state from being achieved.
*/
public AMQState await() throws AMQException
{
@@ -88,13 +88,13 @@ public class StateWaiter extends BlockingWaiter<AMQState>
}
/**
- * Await for the requried State to be achieved.
+ * Await for the required State to be achieved.
*
* <b>It is the responsibility of this class to remove the waiter from the StateManager
*
- * @param timeout The time in milliseconds to wait for any of the states to be achived.
+ * @param timeout The time in milliseconds to wait for any of the states to be achieved.
* @return The achieved state that was requested.
- * @throws AMQException The exception that prevented the required state from being achived.
+ * @throws AMQException The exception that prevented the required state from being achieved.
*/
public AMQState await(long timeout) throws AMQException
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java b/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
index 208658a5ff..bec41644fc 100644
--- a/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
+++ b/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
@@ -28,9 +28,8 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.protocol.AMQMethodListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* BlockingWaiter is a 'rendezvous' which delegates handling of
@@ -64,6 +63,8 @@ import org.apache.qpid.protocol.AMQMethodListener;
*/
public abstract class BlockingWaiter<T>
{
+ private static final Logger _logger = LoggerFactory.getLogger(BlockingWaiter.class);
+
/** This flag is used to indicate that the blocked for method has been received. */
private volatile boolean _ready = false;
@@ -180,7 +181,7 @@ public abstract class BlockingWaiter<T>
}
catch (InterruptedException e)
{
- System.err.println(e.getMessage());
+ _logger.error(e.getMessage(), e);
// IGNORE -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess
// if (!_ready && timeout != -1)
// {
@@ -228,12 +229,12 @@ public abstract class BlockingWaiter<T>
}
/**
- * This is a callback, called when an error has occured that should interupt any waiter.
+ * This is a callback, called when an error has occurred that should interrupt any waiter.
* It is also called from within this class to avoid code repetition but it should only be called by the MINA threads.
*
* Once closed any notification of an exception will be ignored.
*
- * @param e The exception being propogated.
+ * @param e The exception being propagated.
*/
public void error(Exception e)
{
@@ -255,7 +256,7 @@ public abstract class BlockingWaiter<T>
}
else
{
- System.err.println("WARNING: new error '" + e == null ? "null" : e.getMessage() + "' arrived while old one not yet processed:" + _error.getMessage());
+ _logger.error("WARNING: new error '" + e == null ? "null" : e.getMessage() + "' arrived while old one not yet processed:" + _error.getMessage());
}
if (_waiting.get())
@@ -272,7 +273,7 @@ public abstract class BlockingWaiter<T>
}
catch (InterruptedException e1)
{
- System.err.println(e.getMessage());
+ _logger.error(e1.getMessage(), e1);
}
}
_errorAck = false;