summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java149
1 files changed, 78 insertions, 71 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java b/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
index 8b8453a1b0..4695b195d5 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
@@ -20,103 +20,110 @@
*/
package org.apache.qpid.client.state;
+import org.apache.qpid.client.util.BlockingWaiter;
+import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.AMQException;
-
-import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+import java.util.Set;
/**
- * Waits for a particular state to be reached.
+ * This is an implementation of the {@link BlockingWaiter} to provide error handing and a waiting mechanism for state
+ * changes.
+ *
+ * On construction the current state and a set of States to await for is provided.
+ *
+ * When await() is called the state at constuction is compared against the awaitStates. If the state at construction is
+ * a desired state then await() returns immediately.
+ *
+ * Otherwise it will block for the set timeout for a desired state to be achieved.
+ *
+ * The state changes are notified via the {@link #process} method.
+ *
+ * Any notified error is handled by the BlockingWaiter and thrown from the {@link #block} method.
+ *
*/
-public class StateWaiter implements StateListener
+public class StateWaiter extends BlockingWaiter<AMQState>
{
private static final Logger _logger = LoggerFactory.getLogger(StateWaiter.class);
- private final AMQState _state;
-
- private volatile boolean _newStateAchieved;
-
- private volatile Throwable _throwable;
-
- private final Object _monitor = new Object();
- private static final long TIME_OUT = 1000 * 60 * 2;
-
- public StateWaiter(AMQState state)
+ Set<AMQState> _awaitStates;
+ private AMQState _startState;
+ private AMQStateManager _stateManager;
+
+ /**
+ *
+ * @param stateManager The StateManager
+ * @param currentState
+ * @param awaitStates
+ */
+ public StateWaiter(AMQStateManager stateManager, AMQState currentState, Set<AMQState> awaitStates)
{
- _state = state;
+ _logger.info("New StateWaiter :" + currentState + ":" + awaitStates);
+ _stateManager = stateManager;
+ _awaitStates = awaitStates;
+ _startState = currentState;
}
- public void waituntilStateHasChanged() throws AMQException
+ /**
+ * When the state is changed this StateWaiter is notified to process the change.
+ *
+ * @param state The new state that has been achieved.
+ * @return
+ */
+ public boolean process(AMQState state)
{
- synchronized (_monitor)
- {
- //
- // The guard is required in case we are woken up by a spurious
- // notify().
- //
- while (!_newStateAchieved && (_throwable == null))
- {
- try
- {
- _logger.debug("State " + _state + " not achieved so waiting...");
- _monitor.wait(TIME_OUT);
- // fixme this won't cause the timeout to exit the loop. need to set _throwable
- }
- catch (InterruptedException e)
- {
- _logger.debug("Interrupted exception caught while waiting: " + e, e);
- }
- }
- }
+ return _awaitStates.contains(state);
+ }
- if (_throwable != null)
- {
- _logger.debug("Throwable reached state waiter: " + _throwable);
- if (_throwable instanceof AMQException)
- {
- throw (AMQException) _throwable;
- }
- else
- {
- throw new AMQException(null, "Error: " + _throwable, _throwable); // FIXME: this will wrap FailoverException in throwable which will prevent it being caught.
- }
- }
+ /**
+ * Await for the requried State to be achieved within the default timeout.
+ * @return The achieved state that was requested.
+ * @throws AMQException The exception that prevented the required state from being achived.
+ */
+ public AMQState await() throws AMQException
+ {
+ return await(_stateManager.getWaitTimeout());
}
- public void stateChanged(AMQState oldState, AMQState newState)
+ /**
+ * Await for the requried State to be achieved.
+ *
+ * <b>It is the responsibility of this class to remove the waiter from the StateManager
+ *
+ * @param timeout The time in milliseconds to wait for any of the states to be achived.
+ * @return The achieved state that was requested.
+ * @throws AMQException The exception that prevented the required state from being achived.
+ */
+ public AMQState await(long timeout) throws AMQException
{
- synchronized (_monitor)
+ try
{
- if (_logger.isDebugEnabled())
+ if (process(_startState))
{
- _logger.debug("stateChanged called changing from :" + oldState + " to :" + newState);
+ return _startState;
}
- if (_state == newState)
+ try
{
- _newStateAchieved = true;
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("New state reached so notifying monitor");
- }
+ return (AMQState) block(timeout);
+ }
+ catch (FailoverException e)
+ {
+ _logger.error("Failover occured whilst waiting for states:" + _awaitStates);
- _monitor.notifyAll();
+ e.printStackTrace();
+ return null;
}
}
- }
-
- public void error(Throwable t)
- {
- synchronized (_monitor)
+ finally
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("exceptionThrown called");
- }
+ //Prevent any more errors being notified to this waiter.
+ close();
- _throwable = t;
- _monitor.notifyAll();
+ //Remove the waiter from the notifcation list in the statee manager
+ _stateManager.removeWaiter(this);
}
}
}