diff options
Diffstat (limited to 'qpid/dotnet/Qpid.Client/Client/State')
8 files changed, 626 insertions, 0 deletions
diff --git a/qpid/dotnet/Qpid.Client/Client/State/AMQState.cs b/qpid/dotnet/Qpid.Client/Client/State/AMQState.cs new file mode 100644 index 0000000000..67f8427fb2 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/State/AMQState.cs @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Apache.Qpid.Client.State +{ + public enum AMQState + { + CONNECTION_NOT_STARTED, + CONNECTION_NOT_TUNED, + CONNECTION_NOT_OPENED, + CONNECTION_OPEN, + CONNECTION_CLOSING, + CONNECTION_CLOSED, + ALL // all is a special state used in the state manager. It is not valid to be "in" the state "all". + } +} + + diff --git a/qpid/dotnet/Qpid.Client/Client/State/AMQStateChangedEvent.cs b/qpid/dotnet/Qpid.Client/Client/State/AMQStateChangedEvent.cs new file mode 100644 index 0000000000..a464bbb6f5 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/State/AMQStateChangedEvent.cs @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Apache.Qpid.Client.State +{ + public class AMQStateChangedEvent + { + private readonly AMQState _oldState; + + private readonly AMQState _newState; + + public AMQStateChangedEvent(AMQState oldState, AMQState newState) + { + _oldState = oldState; + _newState = newState; + } + + public AMQState OldState + { + get + { + return _oldState; + } + } + + public AMQState NewState + { + get + { + return _newState; + } + } + + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/State/AMQStateManager.cs b/qpid/dotnet/Qpid.Client/Client/State/AMQStateManager.cs new file mode 100644 index 0000000000..881e01e697 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/State/AMQStateManager.cs @@ -0,0 +1,251 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using log4net; +using Apache.Qpid.Client.Handler; +using Apache.Qpid.Client.Protocol; +using Apache.Qpid.Client.Protocol.Listener; +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client.State +{ + public class AMQStateManager : IAMQMethodListener + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(AMQStateManager)); + + const bool InfoLoggingHack = true; + + /// <summary> + /// The current state + /// </summary> + private AMQState _currentState; + + /// <summary> + /// Maps from an AMQState instance to a Map from Class to StateTransitionHandler. + /// The class must be a subclass of AMQFrame. + /// </summary> + private readonly IDictionary _state2HandlersMap; + private ArrayList _stateListeners; + private object _syncLock; + + public AMQStateManager() + { + _syncLock = new object(); + _state2HandlersMap = new Hashtable(); + _stateListeners = ArrayList.Synchronized(new ArrayList(5)); + _currentState = AMQState.CONNECTION_NOT_STARTED; + RegisterListeners(); + } + + private void RegisterListeners() + { + IStateAwareMethodListener connectionStart = new ConnectionStartMethodHandler(); + IStateAwareMethodListener connectionClose = new ConnectionCloseMethodHandler(); + IStateAwareMethodListener connectionCloseOk = new ConnectionCloseOkHandler(); + IStateAwareMethodListener connectionTune = new ConnectionTuneMethodHandler(); + IStateAwareMethodListener connectionSecure = new ConnectionSecureMethodHandler(); + IStateAwareMethodListener connectionOpenOk = new ConnectionOpenOkMethodHandler(); + IStateAwareMethodListener channelClose = new ChannelCloseMethodHandler(); + IStateAwareMethodListener basicDeliver = new BasicDeliverMethodHandler(); + IStateAwareMethodListener basicReturn = new BasicReturnMethodHandler(); + IStateAwareMethodListener queueDeleteOk = new QueueDeleteOkMethodHandler(); + IStateAwareMethodListener queuePurgeOk = new QueuePurgeOkMethodHandler(); + + // We need to register a map for the null (i.e. all state) handlers otherwise you get + // a stack overflow in the handler searching code when you present it with a frame for which + // no handlers are registered. + _state2HandlersMap[AMQState.ALL] = new Hashtable(); + + { + Hashtable notStarted = new Hashtable(); + notStarted[typeof(ConnectionStartBody)] = connectionStart; + notStarted[typeof(ConnectionCloseBody)] = connectionClose; + _state2HandlersMap[AMQState.CONNECTION_NOT_STARTED] = notStarted; + } + { + Hashtable notTuned = new Hashtable(); + notTuned[typeof(ConnectionTuneBody)] = connectionTune; + notTuned[typeof(ConnectionSecureBody)] = connectionSecure; + notTuned[typeof(ConnectionCloseBody)] = connectionClose; + _state2HandlersMap[AMQState.CONNECTION_NOT_TUNED] = notTuned; + } + { + Hashtable notOpened = new Hashtable(); + notOpened[typeof(ConnectionOpenOkBody)] = connectionOpenOk; + notOpened[typeof(ConnectionCloseBody)] = connectionClose; + _state2HandlersMap[AMQState.CONNECTION_NOT_OPENED] = notOpened; + } + { + Hashtable open = new Hashtable(); + open[typeof(ChannelCloseBody)] = channelClose; + open[typeof(ConnectionCloseBody)] = connectionClose; + open[typeof(BasicDeliverBody)] = basicDeliver; + open[typeof(BasicReturnBody)] = basicReturn; + open[typeof(QueueDeleteOkBody)] = queueDeleteOk; + open[typeof(QueuePurgeOkBody)] = queuePurgeOk; + _state2HandlersMap[AMQState.CONNECTION_OPEN] = open; + } + { + Hashtable closing = new Hashtable(); + closing[typeof(ConnectionCloseOkBody)] = connectionCloseOk; + _state2HandlersMap[AMQState.CONNECTION_CLOSING] = closing; + } + } + + public AMQState CurrentState + { + get + { + return _currentState; + } + } + + /// <summary> + /// Changes the state. + /// </summary> + /// <param name="newState">The new state.</param> + /// <exception cref="AMQException">if there is an error changing state</exception> + public void ChangeState(AMQState newState) + { + if (InfoLoggingHack) + { + _logger.Debug("State changing to " + newState + " from old state " + _currentState); + } + _logger.Debug("State changing to " + newState + " from old state " + _currentState); + AMQState oldState = _currentState; + _currentState = newState; + + lock ( _syncLock ) + { + foreach ( IStateListener l in _stateListeners ) + { + l.StateChanged(oldState, newState); + } + } + } + + public void Error(Exception e) + { + _logger.Debug("State manager receive error notification: " + e); + lock ( _syncLock ) + { + foreach ( IStateListener l in _stateListeners ) + { + l.Error(e); + } + } + } + + public bool MethodReceived(AMQMethodEvent evt) + { + _logger.Debug(String.Format("Finding method handler. currentState={0} type={1}", _currentState, evt.Method.GetType())); + IStateAwareMethodListener handler = FindStateTransitionHandler(_currentState, evt.Method); + if (handler != null) + { + handler.MethodReceived(this, evt); + return true; + } + return false; + } + + /// <summary> + /// Finds the state transition handler. + /// </summary> + /// <param name="currentState">State of the current.</param> + /// <param name="frame">The frame.</param> + /// <returns></returns> + /// <exception cref="IllegalStateTransitionException">if the state transition if not allowed</exception> + private IStateAwareMethodListener FindStateTransitionHandler(AMQState currentState, + AMQMethodBody frame) + { + Type clazz = frame.GetType(); + if (_logger.IsDebugEnabled) + { + _logger.Debug("Looking for state transition handler for frame " + clazz); + } + IDictionary classToHandlerMap = (IDictionary) _state2HandlersMap[currentState]; + + if (classToHandlerMap == null) + { + // if no specialised per state handler is registered look for a + // handler registered for "all" states + return FindStateTransitionHandler(AMQState.ALL, frame); + } + IStateAwareMethodListener handler = (IStateAwareMethodListener) classToHandlerMap[clazz]; + if (handler == null) + { + if (currentState == AMQState.ALL) + { + _logger.Debug("No state transition handler defined for receiving frame " + frame); + return null; + } + else + { + // if no specialised per state handler is registered look for a + // handler registered for "all" states + return FindStateTransitionHandler(AMQState.ALL, frame); + } + } + else + { + return handler; + } + } + + public void AddStateListener(IStateListener listener) + { + _logger.Debug("Adding state listener"); + lock ( _syncLock ) + { + _stateListeners.Add(listener); + } + } + + public void RemoveStateListener(IStateListener listener) + { + lock ( _syncLock ) + { + _stateListeners.Remove(listener); + } + } + + public void AttainState(AMQState s) + { + if (_currentState != s) + { + StateWaiter sw = null; + try + { + _logger.Debug("Adding state wait to reach state " + s); + sw = new StateWaiter(s); + AddStateListener(sw); + sw.WaituntilStateHasChanged(); + // at this point the state will have changed. + } + finally + { + RemoveStateListener(sw); + } + } + } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/State/IAMQStateListener.cs b/qpid/dotnet/Qpid.Client/Client/State/IAMQStateListener.cs new file mode 100644 index 0000000000..31e4b5046d --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/State/IAMQStateListener.cs @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Apache.Qpid.Client.State +{ + public interface IAMQStateListener + { + void StateChanged(AMQStateChangedEvent evt); + } +} + + diff --git a/qpid/dotnet/Qpid.Client/Client/State/IStateAwareMethodListener.cs b/qpid/dotnet/Qpid.Client/Client/State/IStateAwareMethodListener.cs new file mode 100644 index 0000000000..0874f39665 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/State/IStateAwareMethodListener.cs @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Apache.Qpid.Client.Protocol; + +namespace Apache.Qpid.Client.State +{ + public interface IStateAwareMethodListener + { + void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt); + } +} + + diff --git a/qpid/dotnet/Qpid.Client/Client/State/IStateListener.cs b/qpid/dotnet/Qpid.Client/Client/State/IStateListener.cs new file mode 100644 index 0000000000..edd7382f93 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/State/IStateListener.cs @@ -0,0 +1,33 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; + +namespace Apache.Qpid.Client.State +{ + public interface IStateListener + { + void StateChanged(AMQState oldState, AMQState newState); + + void Error(Exception e); + } +} + + diff --git a/qpid/dotnet/Qpid.Client/Client/State/IllegalStateTransitionException.cs b/qpid/dotnet/Qpid.Client/Client/State/IllegalStateTransitionException.cs new file mode 100644 index 0000000000..81de622617 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/State/IllegalStateTransitionException.cs @@ -0,0 +1,74 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Runtime.Serialization; + +namespace Apache.Qpid.Client.State +{ + [Serializable] + public class IllegalStateTransitionException : AMQException + { + private AMQState _originalState; + + private Type _frame; + + public IllegalStateTransitionException(AMQState originalState, Type frame) + : base("No valid state transition defined for receiving frame " + frame + + " from state " + originalState) + { + _originalState = originalState; + _frame = frame; + } + + protected IllegalStateTransitionException(SerializationInfo info, StreamingContext ctxt) + : base(info, ctxt) + { + _originalState = (AMQState)info.GetValue("OriginalState", typeof(AMQState)); + _frame = (Type)info.GetValue("FrameType", typeof(Type)); + } + + public AMQState OriginalState + { + get + { + return _originalState; + } + } + + public Type FrameType + { + get + { + return _frame; + } + } + + public override void GetObjectData(SerializationInfo info, StreamingContext context) + { + base.GetObjectData(info, context); + info.AddValue("OriginalState", OriginalState); + info.AddValue("FrameType", FrameType); + } + } +} + + + diff --git a/qpid/dotnet/Qpid.Client/Client/State/StateWaiter.cs b/qpid/dotnet/Qpid.Client/Client/State/StateWaiter.cs new file mode 100644 index 0000000000..e739d0cb44 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/State/StateWaiter.cs @@ -0,0 +1,121 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; +using Apache.Qpid.Client.Protocol; +using log4net; + +namespace Apache.Qpid.Client.State +{ + public class StateWaiter : IStateListener + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(StateWaiter)); + + private readonly AMQState _state; + private AMQState _newState; + + private volatile bool _newStateAchieved; + + private volatile Exception _exception; + + private ManualResetEvent _resetEvent = new ManualResetEvent(false); + + public StateWaiter(AMQState state) + { + _state = state; + } + + public void StateChanged(AMQState oldState, AMQState newState) + { + _newState = newState; + if (_logger.IsDebugEnabled) + { + _logger.Debug("stateChanged called"); + } + if (_state == newState) + { + _newStateAchieved = true; + + if (_logger.IsDebugEnabled) + { + _logger.Debug("New state reached so notifying monitor"); + } + _resetEvent.Set(); + } + } + + public void Error(Exception e) + { + if (_logger.IsDebugEnabled) + { + _logger.Debug("exceptionThrown called"); + } + + _exception = e; + _resetEvent.Set(); + } + + public void WaituntilStateHasChanged() + { + // + // The guard is required in case we are woken up by a spurious + // notify(). + // + + TimeSpan waitTime = TimeSpan.FromMilliseconds(DefaultTimeouts.MaxWaitForState); + DateTime waitUntilTime = DateTime.Now + waitTime; + + while ( !_newStateAchieved + && _exception == null + && waitTime.TotalMilliseconds > 0 ) + { + _logger.Debug("State not achieved so waiting..."); + try + { + _resetEvent.WaitOne(waitTime, true); + } + finally + { + if (!_newStateAchieved) + { + waitTime = waitUntilTime - DateTime.Now; + } + } + } + + if (_exception != null) + { + _logger.Debug("Throwable reached state waiter: " + _exception); + if (_exception is AMQException) + throw _exception; + else + throw new AMQException("Error: " + _exception, _exception); + } + + if (!_newStateAchieved) + { + string error = string.Format("State not achieved within permitted time. Current state: {0}, desired state: {1}", _state, _newState); + _logger.Warn(error); + throw new AMQException(error); + } + } + } +} |