summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java185
1 files changed, 96 insertions, 89 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index eda1a1f5fd..f8645139f2 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -28,15 +28,28 @@ import org.apache.qpid.protocol.AMQMethodListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Iterator;
import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
/**
- * The state manager is responsible for managing the state of the protocol session. <p/> For each AMQProtocolHandler
- * there is a separate state manager.
+ * The state manager is responsible for managing the state of the protocol session. <p/>
+ * For each {@link org.apache.qpid.client.protocol.AMQProtocolHandler} there is a separate state manager.
+ *
+ * The AMQStateManager is now attached to the {@link org.apache.qpid.client.protocol.AMQProtocolHandler} and that is the sole point of reference so that
+ * As the {@link AMQProtocolSession} changes due to failover the AMQStateManager need not be copied around.
+ *
+ * The StateManager works by any component can wait for a state change to occur by using the following sequence.
+ *
+ * <li>StateWaiter waiter = stateManager.createWaiter(Set<AMQState> states);
+ * <li> // Perform action that will cause state change
+ * <li>waiter.await();
+ *
+ * The two step process is required as there is an inherit race condition between starting a process that will cause
+ * the state to change and then attempting to wait for that change. The interest in the change must be first set up so
+ * that any asynchrous errors that occur can be delivered to the correct waiters.
*/
-public class AMQStateManager
+public class AMQStateManager implements AMQMethodListener
{
private static final Logger _logger = LoggerFactory.getLogger(AMQStateManager.class);
@@ -45,16 +58,13 @@ public class AMQStateManager
/** The current state */
private AMQState _currentState;
-
- /**
- * Maps from an AMQState instance to a Map from Class to StateTransitionHandler. The class must be a subclass of
- * AMQFrame.
- */
-
-
private final Object _stateLock = new Object();
+
private static final long MAXIMUM_STATE_WAIT_TIME = Long.parseLong(System.getProperty("amqj.MaximumStateWait", "30000"));
+ protected final List<StateWaiter> _waiters = new CopyOnWriteArrayList<StateWaiter>();
+ private Exception _lastException;
+
public AMQStateManager()
{
this(null);
@@ -62,18 +72,15 @@ public class AMQStateManager
public AMQStateManager(AMQProtocolSession protocolSession)
{
- this(AMQState.CONNECTION_NOT_STARTED, true, protocolSession);
+ this(AMQState.CONNECTION_NOT_STARTED, protocolSession);
}
- protected AMQStateManager(AMQState state, boolean register, AMQProtocolSession protocolSession)
+ protected AMQStateManager(AMQState state, AMQProtocolSession protocolSession)
{
_protocolSession = protocolSession;
_currentState = state;
-
}
-
-
public AMQState getCurrentState()
{
return _currentState;
@@ -86,107 +93,107 @@ public class AMQStateManager
synchronized (_stateLock)
{
_currentState = newState;
- _stateLock.notifyAll();
+
+ _logger.debug("Notififying State change to " + _waiters.size() + " : " + _waiters);
+
+ for (StateWaiter waiter : _waiters)
+ {
+ waiter.received(newState);
+ }
}
}
-
public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException
{
-
B method = evt.getMethod();
-
+
// StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod());
method.execute(_protocolSession.getMethodDispatcher(), evt.getChannelId());
return true;
}
-
- public void attainState(final AMQState s) throws AMQException
+ /**
+ * Setting of the ProtocolSession will be required when Failover has been successfuly compeleted.
+ *
+ * The new {@link AMQProtocolSession} that has been re-established needs to be provided as that is now the
+ * connection to the network.
+ *
+ * @param session The new protocol session
+ */
+ public void setProtocolSession(AMQProtocolSession session)
{
- synchronized (_stateLock)
+ if (_logger.isInfoEnabled())
{
- final long waitUntilTime = System.currentTimeMillis() + MAXIMUM_STATE_WAIT_TIME;
- long waitTime = MAXIMUM_STATE_WAIT_TIME;
-
- while ((_currentState != s) && (waitTime > 0))
- {
- try
- {
- _stateLock.wait(MAXIMUM_STATE_WAIT_TIME);
- }
- catch (InterruptedException e)
- {
- _logger.warn("Thread interrupted");
- }
-
- if (_currentState != s)
- {
- waitTime = waitUntilTime - System.currentTimeMillis();
- }
- }
-
- if (_currentState != s)
- {
- _logger.warn("State not achieved within permitted time. Current state " + _currentState
- + ", desired state: " + s);
- throw new AMQException(null, "State not achieved within permitted time. Current state " + _currentState
- + ", desired state: " + s, null);
- }
+ _logger.info("Setting ProtocolSession:" + session);
}
-
- // at this point the state will have changed.
+ _protocolSession = session;
}
- public AMQProtocolSession getProtocolSession()
+ /**
+ * Propogate error to waiters
+ *
+ * @param error The error to propogate.
+ */
+ public void error(Exception error)
{
- return _protocolSession;
+ if (_waiters.size() == 0)
+ {
+ _logger.error("No Waiters for error saving as last error:" + error.getMessage());
+ _lastException = error;
+ }
+ for (StateWaiter waiter : _waiters)
+ {
+ _logger.error("Notifying Waiters(" + _waiters + ") for error:" + error.getMessage());
+ waiter.error(error);
+ }
}
- public void setProtocolSession(AMQProtocolSession session)
+ /**
+ * This provides a single place that the maximum time for state change to occur can be accessed.
+ * It is currently set via System property amqj.MaximumStateWait
+ *
+ * @return long Milliseconds value for a timeout
+ */
+ public long getWaitTimeout()
{
- _protocolSession = session;
+ return MAXIMUM_STATE_WAIT_TIME;
}
- public MethodRegistry getMethodRegistry()
+ /**
+ * Create and add a new waiter to the notifcation list.
+ *
+ * @param states The waiter will attempt to wait for one of these desired set states to be achived.
+ *
+ * @return the created StateWaiter.
+ */
+ public StateWaiter createWaiter(Set<AMQState> states)
{
- return getProtocolSession().getMethodRegistry();
+ final StateWaiter waiter;
+ synchronized (_stateLock)
+ {
+ waiter = new StateWaiter(this, _currentState, states);
+
+ _waiters.add(waiter);
+ }
+
+ return waiter;
}
- public AMQState attainState(Set<AMQState> stateSet) throws AMQException
+ /**
+ * Remove the waiter from the notification list.
+ *
+ * @param waiter The waiter to remove.
+ */
+ public void removeWaiter(StateWaiter waiter)
{
synchronized (_stateLock)
{
- final long waitUntilTime = System.currentTimeMillis() + MAXIMUM_STATE_WAIT_TIME;
- long waitTime = MAXIMUM_STATE_WAIT_TIME;
-
- while (!stateSet.contains(_currentState) && (waitTime > 0))
- {
- try
- {
- _stateLock.wait(MAXIMUM_STATE_WAIT_TIME);
- }
- catch (InterruptedException e)
- {
- _logger.warn("Thread interrupted");
- }
-
- if (!stateSet.contains(_currentState))
- {
- waitTime = waitUntilTime - System.currentTimeMillis();
- }
- }
-
- if (!stateSet.contains(_currentState))
- {
- _logger.warn("State not achieved within permitted time. Current state " + _currentState
- + ", desired state: " + stateSet);
- throw new AMQException(null, "State not achieved within permitted time. Current state " + _currentState
- + ", desired state: " + stateSet, null);
- }
- return _currentState;
+ _waiters.remove(waiter);
}
+ }
-
+ public Exception getLastException()
+ {
+ return _lastException;
}
}