diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java | 149 |
1 files changed, 78 insertions, 71 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java b/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java index 8b8453a1b0..4695b195d5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java @@ -20,103 +20,110 @@ */ package org.apache.qpid.client.state; +import org.apache.qpid.client.util.BlockingWaiter; +import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.AMQException; - -import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.Logger; + +import java.util.Set; /** - * Waits for a particular state to be reached. + * This is an implementation of the {@link BlockingWaiter} to provide error handing and a waiting mechanism for state + * changes. + * + * On construction the current state and a set of States to await for is provided. + * + * When await() is called the state at constuction is compared against the awaitStates. If the state at construction is + * a desired state then await() returns immediately. + * + * Otherwise it will block for the set timeout for a desired state to be achieved. + * + * The state changes are notified via the {@link #process} method. + * + * Any notified error is handled by the BlockingWaiter and thrown from the {@link #block} method. + * */ -public class StateWaiter implements StateListener +public class StateWaiter extends BlockingWaiter<AMQState> { private static final Logger _logger = LoggerFactory.getLogger(StateWaiter.class); - private final AMQState _state; - - private volatile boolean _newStateAchieved; - - private volatile Throwable _throwable; - - private final Object _monitor = new Object(); - private static final long TIME_OUT = 1000 * 60 * 2; - - public StateWaiter(AMQState state) + Set<AMQState> _awaitStates; + private AMQState _startState; + private AMQStateManager _stateManager; + + /** + * + * @param stateManager The StateManager + * @param currentState + * @param awaitStates + */ + public StateWaiter(AMQStateManager stateManager, AMQState currentState, Set<AMQState> awaitStates) { - _state = state; + _logger.info("New StateWaiter :" + currentState + ":" + awaitStates); + _stateManager = stateManager; + _awaitStates = awaitStates; + _startState = currentState; } - public void waituntilStateHasChanged() throws AMQException + /** + * When the state is changed this StateWaiter is notified to process the change. + * + * @param state The new state that has been achieved. + * @return + */ + public boolean process(AMQState state) { - synchronized (_monitor) - { - // - // The guard is required in case we are woken up by a spurious - // notify(). - // - while (!_newStateAchieved && (_throwable == null)) - { - try - { - _logger.debug("State " + _state + " not achieved so waiting..."); - _monitor.wait(TIME_OUT); - // fixme this won't cause the timeout to exit the loop. need to set _throwable - } - catch (InterruptedException e) - { - _logger.debug("Interrupted exception caught while waiting: " + e, e); - } - } - } + return _awaitStates.contains(state); + } - if (_throwable != null) - { - _logger.debug("Throwable reached state waiter: " + _throwable); - if (_throwable instanceof AMQException) - { - throw (AMQException) _throwable; - } - else - { - throw new AMQException(null, "Error: " + _throwable, _throwable); // FIXME: this will wrap FailoverException in throwable which will prevent it being caught. - } - } + /** + * Await for the requried State to be achieved within the default timeout. + * @return The achieved state that was requested. + * @throws AMQException The exception that prevented the required state from being achived. + */ + public AMQState await() throws AMQException + { + return await(_stateManager.getWaitTimeout()); } - public void stateChanged(AMQState oldState, AMQState newState) + /** + * Await for the requried State to be achieved. + * + * <b>It is the responsibility of this class to remove the waiter from the StateManager + * + * @param timeout The time in milliseconds to wait for any of the states to be achived. + * @return The achieved state that was requested. + * @throws AMQException The exception that prevented the required state from being achived. + */ + public AMQState await(long timeout) throws AMQException { - synchronized (_monitor) + try { - if (_logger.isDebugEnabled()) + if (process(_startState)) { - _logger.debug("stateChanged called changing from :" + oldState + " to :" + newState); + return _startState; } - if (_state == newState) + try { - _newStateAchieved = true; - - if (_logger.isDebugEnabled()) - { - _logger.debug("New state reached so notifying monitor"); - } + return (AMQState) block(timeout); + } + catch (FailoverException e) + { + _logger.error("Failover occured whilst waiting for states:" + _awaitStates); - _monitor.notifyAll(); + e.printStackTrace(); + return null; } } - } - - public void error(Throwable t) - { - synchronized (_monitor) + finally { - if (_logger.isDebugEnabled()) - { - _logger.debug("exceptionThrown called"); - } + //Prevent any more errors being notified to this waiter. + close(); - _throwable = t; - _monitor.notifyAll(); + //Remove the waiter from the notifcation list in the statee manager + _stateManager.removeWaiter(this); } } } |