diff options
Diffstat (limited to 'java')
5 files changed, 59 insertions, 34 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 92f9ebe07c..b1a22155d6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -36,11 +36,13 @@ import javax.jms.XASession; import javax.net.ssl.SSLContext; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.failover.FailoverRetrySupport; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateWaiter; import org.apache.qpid.framing.BasicQosBody; import org.apache.qpid.framing.BasicQosOkBody; @@ -69,8 +71,30 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate public void closeConnection(long timeout) throws JMSException, AMQException { - _conn.getProtocolHandler().closeConnection(timeout); + final AMQStateManager stateManager = _conn.getProtocolHandler().getStateManager(); + final AMQState currentState = stateManager.getCurrentState(); + if (currentState.equals(AMQState.CONNECTION_CLOSED)) + { + _logger.debug("Connection already closed."); + } + else if (currentState.equals(AMQState.CONNECTION_CLOSING)) + { + _logger.debug("Connection already closing, awaiting closed state."); + final StateWaiter closeWaiter = new StateWaiter(stateManager, currentState, EnumSet.of(AMQState.CONNECTION_CLOSED)); + try + { + closeWaiter.await(timeout); + } + catch (AMQTimeoutException te) + { + throw new AMQTimeoutException("Close did not complete in timely fashion", te); + } + } + else + { + _conn.getProtocolHandler().closeConnection(timeout); + } } public AMQConnectionDelegate_8_0(AMQConnection conn) diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index de21b874e8..284954edba 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -665,22 +665,21 @@ public class AMQProtocolHandler implements ProtocolEngine * <p/>If a failover exception occurs whilst closing the connection it is ignored, as the connection is closed * anyway. * - * @param timeout The timeout to wait for an acknowledgement to the close request. + * @param timeout The timeout to wait for an acknowledgment to the close request. * * @throws AMQException If the close fails for any reason. */ public void closeConnection(long timeout) throws AMQException { - ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - new AMQShortString("JMS client is closing the connection."), 0, 0); - - final AMQFrame frame = body.generateFrame(0); - - //If the connection is already closed then don't do a syncWrite if (!getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED)) { + // Connection is already closed then don't do a syncWrite try { + final ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode + new AMQShortString("JMS client is closing the connection."), 0, 0); + final AMQFrame frame = body.generateFrame(0); + syncWrite(frame, ConnectionCloseOkBody.class, timeout); _network.close(); closed(); @@ -691,10 +690,9 @@ public class AMQProtocolHandler implements ProtocolEngine } catch (FailoverException e) { - _logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway."); + _logger.debug("FailoverException interrupted connection close, ignoring as connection closed anyway."); } } - } /** @return the number of bytes read from this protocol session */ diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index 9c7d62670c..0d6fc727c1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory; import java.util.Set; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; -import java.io.IOException; /** * The state manager is responsible for managing the state of the protocol session. <p/> @@ -48,7 +47,7 @@ import java.io.IOException; * * The two step process is required as there is an inherit race condition between starting a process that will cause * the state to change and then attempting to wait for that change. The interest in the change must be first set up so - * that any asynchrous errors that occur can be delivered to the correct waiters. + * that any asynchronous errors that occur can be delivered to the correct waiters. */ public class AMQStateManager implements AMQMethodListener { @@ -84,7 +83,10 @@ public class AMQStateManager implements AMQMethodListener public AMQState getCurrentState() { - return _currentState; + synchronized (_stateLock) + { + return _currentState; + } } public void changeState(AMQState newState) @@ -114,7 +116,7 @@ public class AMQStateManager implements AMQMethodListener } /** - * Setting of the ProtocolSession will be required when Failover has been successfuly compeleted. + * Setting of the ProtocolSession will be required when Failover has been successfully completed. * * The new {@link AMQProtocolSession} that has been re-established needs to be provided as that is now the * connection to the network. @@ -131,9 +133,9 @@ public class AMQStateManager implements AMQMethodListener } /** - * Propogate error to waiters + * Propagate error to waiters * - * @param error The error to propogate. + * @param error The error to propagate. */ public void error(Exception error) { @@ -177,7 +179,7 @@ public class AMQStateManager implements AMQMethodListener } /** - * Create and add a new waiter to the notifcation list. + * Create and add a new waiter to the notification list. * * @param states The waiter will attempt to wait for one of these desired set states to be achived. * diff --git a/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java b/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java index 79f438d35d..732480e1c9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java @@ -34,7 +34,7 @@ import java.util.Set; * * On construction the current state and a set of States to await for is provided. * - * When await() is called the state at constuction is compared against the awaitStates. If the state at construction is + * When await() is called the state at construction is compared against the awaitStates. If the state at construction is * a desired state then await() returns immediately. * * Otherwise it will block for the set timeout for a desired state to be achieved. @@ -48,9 +48,9 @@ public class StateWaiter extends BlockingWaiter<AMQState> { private static final Logger _logger = LoggerFactory.getLogger(StateWaiter.class); - Set<AMQState> _awaitStates; - private AMQState _startState; - private AMQStateManager _stateManager; + private final Set<AMQState> _awaitStates; + private final AMQState _startState; + private final AMQStateManager _stateManager; /** * @@ -78,9 +78,9 @@ public class StateWaiter extends BlockingWaiter<AMQState> } /** - * Await for the requried State to be achieved within the default timeout. + * Await for the required State to be achieved within the default timeout. * @return The achieved state that was requested. - * @throws AMQException The exception that prevented the required state from being achived. + * @throws AMQException The exception that prevented the required state from being achieved. */ public AMQState await() throws AMQException { @@ -88,13 +88,13 @@ public class StateWaiter extends BlockingWaiter<AMQState> } /** - * Await for the requried State to be achieved. + * Await for the required State to be achieved. * * <b>It is the responsibility of this class to remove the waiter from the StateManager * - * @param timeout The time in milliseconds to wait for any of the states to be achived. + * @param timeout The time in milliseconds to wait for any of the states to be achieved. * @return The achieved state that was requested. - * @throws AMQException The exception that prevented the required state from being achived. + * @throws AMQException The exception that prevented the required state from being achieved. */ public AMQState await(long timeout) throws AMQException { diff --git a/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java b/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java index 208658a5ff..bec41644fc 100644 --- a/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java +++ b/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java @@ -28,9 +28,8 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.qpid.AMQException; import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.client.failover.FailoverException; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.protocol.AMQMethodListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * BlockingWaiter is a 'rendezvous' which delegates handling of @@ -64,6 +63,8 @@ import org.apache.qpid.protocol.AMQMethodListener; */ public abstract class BlockingWaiter<T> { + private static final Logger _logger = LoggerFactory.getLogger(BlockingWaiter.class); + /** This flag is used to indicate that the blocked for method has been received. */ private volatile boolean _ready = false; @@ -180,7 +181,7 @@ public abstract class BlockingWaiter<T> } catch (InterruptedException e) { - System.err.println(e.getMessage()); + _logger.error(e.getMessage(), e); // IGNORE -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess // if (!_ready && timeout != -1) // { @@ -228,12 +229,12 @@ public abstract class BlockingWaiter<T> } /** - * This is a callback, called when an error has occured that should interupt any waiter. + * This is a callback, called when an error has occurred that should interrupt any waiter. * It is also called from within this class to avoid code repetition but it should only be called by the MINA threads. * * Once closed any notification of an exception will be ignored. * - * @param e The exception being propogated. + * @param e The exception being propagated. */ public void error(Exception e) { @@ -255,7 +256,7 @@ public abstract class BlockingWaiter<T> } else { - System.err.println("WARNING: new error '" + e == null ? "null" : e.getMessage() + "' arrived while old one not yet processed:" + _error.getMessage()); + _logger.error("WARNING: new error '" + e == null ? "null" : e.getMessage() + "' arrived while old one not yet processed:" + _error.getMessage()); } if (_waiting.get()) @@ -272,7 +273,7 @@ public abstract class BlockingWaiter<T> } catch (InterruptedException e1) { - System.err.println(e.getMessage()); + _logger.error(e1.getMessage(), e1); } } _errorAck = false; |