diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java | 185 |
1 files changed, 96 insertions, 89 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index eda1a1f5fd..f8645139f2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -28,15 +28,28 @@ import org.apache.qpid.protocol.AMQMethodListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Iterator; import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; /** - * The state manager is responsible for managing the state of the protocol session. <p/> For each AMQProtocolHandler - * there is a separate state manager. + * The state manager is responsible for managing the state of the protocol session. <p/> + * For each {@link org.apache.qpid.client.protocol.AMQProtocolHandler} there is a separate state manager. + * + * The AMQStateManager is now attached to the {@link org.apache.qpid.client.protocol.AMQProtocolHandler} and that is the sole point of reference so that + * As the {@link AMQProtocolSession} changes due to failover the AMQStateManager need not be copied around. + * + * The StateManager works by any component can wait for a state change to occur by using the following sequence. + * + * <li>StateWaiter waiter = stateManager.createWaiter(Set<AMQState> states); + * <li> // Perform action that will cause state change + * <li>waiter.await(); + * + * The two step process is required as there is an inherit race condition between starting a process that will cause + * the state to change and then attempting to wait for that change. The interest in the change must be first set up so + * that any asynchrous errors that occur can be delivered to the correct waiters. */ -public class AMQStateManager +public class AMQStateManager implements AMQMethodListener { private static final Logger _logger = LoggerFactory.getLogger(AMQStateManager.class); @@ -45,16 +58,13 @@ public class AMQStateManager /** The current state */ private AMQState _currentState; - - /** - * Maps from an AMQState instance to a Map from Class to StateTransitionHandler. The class must be a subclass of - * AMQFrame. - */ - - private final Object _stateLock = new Object(); + private static final long MAXIMUM_STATE_WAIT_TIME = Long.parseLong(System.getProperty("amqj.MaximumStateWait", "30000")); + protected final List<StateWaiter> _waiters = new CopyOnWriteArrayList<StateWaiter>(); + private Exception _lastException; + public AMQStateManager() { this(null); @@ -62,18 +72,15 @@ public class AMQStateManager public AMQStateManager(AMQProtocolSession protocolSession) { - this(AMQState.CONNECTION_NOT_STARTED, true, protocolSession); + this(AMQState.CONNECTION_NOT_STARTED, protocolSession); } - protected AMQStateManager(AMQState state, boolean register, AMQProtocolSession protocolSession) + protected AMQStateManager(AMQState state, AMQProtocolSession protocolSession) { _protocolSession = protocolSession; _currentState = state; - } - - public AMQState getCurrentState() { return _currentState; @@ -86,107 +93,107 @@ public class AMQStateManager synchronized (_stateLock) { _currentState = newState; - _stateLock.notifyAll(); + + _logger.debug("Notififying State change to " + _waiters.size() + " : " + _waiters); + + for (StateWaiter waiter : _waiters) + { + waiter.received(newState); + } } } - public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException { - B method = evt.getMethod(); - + // StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod()); method.execute(_protocolSession.getMethodDispatcher(), evt.getChannelId()); return true; } - - public void attainState(final AMQState s) throws AMQException + /** + * Setting of the ProtocolSession will be required when Failover has been successfuly compeleted. + * + * The new {@link AMQProtocolSession} that has been re-established needs to be provided as that is now the + * connection to the network. + * + * @param session The new protocol session + */ + public void setProtocolSession(AMQProtocolSession session) { - synchronized (_stateLock) + if (_logger.isInfoEnabled()) { - final long waitUntilTime = System.currentTimeMillis() + MAXIMUM_STATE_WAIT_TIME; - long waitTime = MAXIMUM_STATE_WAIT_TIME; - - while ((_currentState != s) && (waitTime > 0)) - { - try - { - _stateLock.wait(MAXIMUM_STATE_WAIT_TIME); - } - catch (InterruptedException e) - { - _logger.warn("Thread interrupted"); - } - - if (_currentState != s) - { - waitTime = waitUntilTime - System.currentTimeMillis(); - } - } - - if (_currentState != s) - { - _logger.warn("State not achieved within permitted time. Current state " + _currentState - + ", desired state: " + s); - throw new AMQException(null, "State not achieved within permitted time. Current state " + _currentState - + ", desired state: " + s, null); - } + _logger.info("Setting ProtocolSession:" + session); } - - // at this point the state will have changed. + _protocolSession = session; } - public AMQProtocolSession getProtocolSession() + /** + * Propogate error to waiters + * + * @param error The error to propogate. + */ + public void error(Exception error) { - return _protocolSession; + if (_waiters.size() == 0) + { + _logger.error("No Waiters for error saving as last error:" + error.getMessage()); + _lastException = error; + } + for (StateWaiter waiter : _waiters) + { + _logger.error("Notifying Waiters(" + _waiters + ") for error:" + error.getMessage()); + waiter.error(error); + } } - public void setProtocolSession(AMQProtocolSession session) + /** + * This provides a single place that the maximum time for state change to occur can be accessed. + * It is currently set via System property amqj.MaximumStateWait + * + * @return long Milliseconds value for a timeout + */ + public long getWaitTimeout() { - _protocolSession = session; + return MAXIMUM_STATE_WAIT_TIME; } - public MethodRegistry getMethodRegistry() + /** + * Create and add a new waiter to the notifcation list. + * + * @param states The waiter will attempt to wait for one of these desired set states to be achived. + * + * @return the created StateWaiter. + */ + public StateWaiter createWaiter(Set<AMQState> states) { - return getProtocolSession().getMethodRegistry(); + final StateWaiter waiter; + synchronized (_stateLock) + { + waiter = new StateWaiter(this, _currentState, states); + + _waiters.add(waiter); + } + + return waiter; } - public AMQState attainState(Set<AMQState> stateSet) throws AMQException + /** + * Remove the waiter from the notification list. + * + * @param waiter The waiter to remove. + */ + public void removeWaiter(StateWaiter waiter) { synchronized (_stateLock) { - final long waitUntilTime = System.currentTimeMillis() + MAXIMUM_STATE_WAIT_TIME; - long waitTime = MAXIMUM_STATE_WAIT_TIME; - - while (!stateSet.contains(_currentState) && (waitTime > 0)) - { - try - { - _stateLock.wait(MAXIMUM_STATE_WAIT_TIME); - } - catch (InterruptedException e) - { - _logger.warn("Thread interrupted"); - } - - if (!stateSet.contains(_currentState)) - { - waitTime = waitUntilTime - System.currentTimeMillis(); - } - } - - if (!stateSet.contains(_currentState)) - { - _logger.warn("State not achieved within permitted time. Current state " + _currentState - + ", desired state: " + stateSet); - throw new AMQException(null, "State not achieved within permitted time. Current state " + _currentState - + ", desired state: " + stateSet, null); - } - return _currentState; + _waiters.remove(waiter); } + } - + public Exception getLastException() + { + return _lastException; } } |