summaryrefslogtreecommitdiff
path: root/qpid/dotnet/Qpid.Client/Client/State
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/dotnet/Qpid.Client/Client/State')
-rw-r--r--qpid/dotnet/Qpid.Client/Client/State/AMQState.cs35
-rw-r--r--qpid/dotnet/Qpid.Client/Client/State/AMQStateChangedEvent.cs52
-rw-r--r--qpid/dotnet/Qpid.Client/Client/State/AMQStateManager.cs251
-rw-r--r--qpid/dotnet/Qpid.Client/Client/State/IAMQStateListener.cs29
-rw-r--r--qpid/dotnet/Qpid.Client/Client/State/IStateAwareMethodListener.cs31
-rw-r--r--qpid/dotnet/Qpid.Client/Client/State/IStateListener.cs33
-rw-r--r--qpid/dotnet/Qpid.Client/Client/State/IllegalStateTransitionException.cs74
-rw-r--r--qpid/dotnet/Qpid.Client/Client/State/StateWaiter.cs121
8 files changed, 626 insertions, 0 deletions
diff --git a/qpid/dotnet/Qpid.Client/Client/State/AMQState.cs b/qpid/dotnet/Qpid.Client/Client/State/AMQState.cs
new file mode 100644
index 0000000000..67f8427fb2
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/State/AMQState.cs
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace Apache.Qpid.Client.State
+{
+ public enum AMQState
+ {
+ CONNECTION_NOT_STARTED,
+ CONNECTION_NOT_TUNED,
+ CONNECTION_NOT_OPENED,
+ CONNECTION_OPEN,
+ CONNECTION_CLOSING,
+ CONNECTION_CLOSED,
+ ALL // all is a special state used in the state manager. It is not valid to be "in" the state "all".
+ }
+}
+
+
diff --git a/qpid/dotnet/Qpid.Client/Client/State/AMQStateChangedEvent.cs b/qpid/dotnet/Qpid.Client/Client/State/AMQStateChangedEvent.cs
new file mode 100644
index 0000000000..a464bbb6f5
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/State/AMQStateChangedEvent.cs
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace Apache.Qpid.Client.State
+{
+ public class AMQStateChangedEvent
+ {
+ private readonly AMQState _oldState;
+
+ private readonly AMQState _newState;
+
+ public AMQStateChangedEvent(AMQState oldState, AMQState newState)
+ {
+ _oldState = oldState;
+ _newState = newState;
+ }
+
+ public AMQState OldState
+ {
+ get
+ {
+ return _oldState;
+ }
+ }
+
+ public AMQState NewState
+ {
+ get
+ {
+ return _newState;
+ }
+ }
+
+ }
+}
diff --git a/qpid/dotnet/Qpid.Client/Client/State/AMQStateManager.cs b/qpid/dotnet/Qpid.Client/Client/State/AMQStateManager.cs
new file mode 100644
index 0000000000..881e01e697
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/State/AMQStateManager.cs
@@ -0,0 +1,251 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using log4net;
+using Apache.Qpid.Client.Handler;
+using Apache.Qpid.Client.Protocol;
+using Apache.Qpid.Client.Protocol.Listener;
+using Apache.Qpid.Framing;
+
+namespace Apache.Qpid.Client.State
+{
+ public class AMQStateManager : IAMQMethodListener
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(AMQStateManager));
+
+ const bool InfoLoggingHack = true;
+
+ /// <summary>
+ /// The current state
+ /// </summary>
+ private AMQState _currentState;
+
+ /// <summary>
+ /// Maps from an AMQState instance to a Map from Class to StateTransitionHandler.
+ /// The class must be a subclass of AMQFrame.
+ /// </summary>
+ private readonly IDictionary _state2HandlersMap;
+ private ArrayList _stateListeners;
+ private object _syncLock;
+
+ public AMQStateManager()
+ {
+ _syncLock = new object();
+ _state2HandlersMap = new Hashtable();
+ _stateListeners = ArrayList.Synchronized(new ArrayList(5));
+ _currentState = AMQState.CONNECTION_NOT_STARTED;
+ RegisterListeners();
+ }
+
+ private void RegisterListeners()
+ {
+ IStateAwareMethodListener connectionStart = new ConnectionStartMethodHandler();
+ IStateAwareMethodListener connectionClose = new ConnectionCloseMethodHandler();
+ IStateAwareMethodListener connectionCloseOk = new ConnectionCloseOkHandler();
+ IStateAwareMethodListener connectionTune = new ConnectionTuneMethodHandler();
+ IStateAwareMethodListener connectionSecure = new ConnectionSecureMethodHandler();
+ IStateAwareMethodListener connectionOpenOk = new ConnectionOpenOkMethodHandler();
+ IStateAwareMethodListener channelClose = new ChannelCloseMethodHandler();
+ IStateAwareMethodListener basicDeliver = new BasicDeliverMethodHandler();
+ IStateAwareMethodListener basicReturn = new BasicReturnMethodHandler();
+ IStateAwareMethodListener queueDeleteOk = new QueueDeleteOkMethodHandler();
+ IStateAwareMethodListener queuePurgeOk = new QueuePurgeOkMethodHandler();
+
+ // We need to register a map for the null (i.e. all state) handlers otherwise you get
+ // a stack overflow in the handler searching code when you present it with a frame for which
+ // no handlers are registered.
+ _state2HandlersMap[AMQState.ALL] = new Hashtable();
+
+ {
+ Hashtable notStarted = new Hashtable();
+ notStarted[typeof(ConnectionStartBody)] = connectionStart;
+ notStarted[typeof(ConnectionCloseBody)] = connectionClose;
+ _state2HandlersMap[AMQState.CONNECTION_NOT_STARTED] = notStarted;
+ }
+ {
+ Hashtable notTuned = new Hashtable();
+ notTuned[typeof(ConnectionTuneBody)] = connectionTune;
+ notTuned[typeof(ConnectionSecureBody)] = connectionSecure;
+ notTuned[typeof(ConnectionCloseBody)] = connectionClose;
+ _state2HandlersMap[AMQState.CONNECTION_NOT_TUNED] = notTuned;
+ }
+ {
+ Hashtable notOpened = new Hashtable();
+ notOpened[typeof(ConnectionOpenOkBody)] = connectionOpenOk;
+ notOpened[typeof(ConnectionCloseBody)] = connectionClose;
+ _state2HandlersMap[AMQState.CONNECTION_NOT_OPENED] = notOpened;
+ }
+ {
+ Hashtable open = new Hashtable();
+ open[typeof(ChannelCloseBody)] = channelClose;
+ open[typeof(ConnectionCloseBody)] = connectionClose;
+ open[typeof(BasicDeliverBody)] = basicDeliver;
+ open[typeof(BasicReturnBody)] = basicReturn;
+ open[typeof(QueueDeleteOkBody)] = queueDeleteOk;
+ open[typeof(QueuePurgeOkBody)] = queuePurgeOk;
+ _state2HandlersMap[AMQState.CONNECTION_OPEN] = open;
+ }
+ {
+ Hashtable closing = new Hashtable();
+ closing[typeof(ConnectionCloseOkBody)] = connectionCloseOk;
+ _state2HandlersMap[AMQState.CONNECTION_CLOSING] = closing;
+ }
+ }
+
+ public AMQState CurrentState
+ {
+ get
+ {
+ return _currentState;
+ }
+ }
+
+ /// <summary>
+ /// Changes the state.
+ /// </summary>
+ /// <param name="newState">The new state.</param>
+ /// <exception cref="AMQException">if there is an error changing state</exception>
+ public void ChangeState(AMQState newState)
+ {
+ if (InfoLoggingHack)
+ {
+ _logger.Debug("State changing to " + newState + " from old state " + _currentState);
+ }
+ _logger.Debug("State changing to " + newState + " from old state " + _currentState);
+ AMQState oldState = _currentState;
+ _currentState = newState;
+
+ lock ( _syncLock )
+ {
+ foreach ( IStateListener l in _stateListeners )
+ {
+ l.StateChanged(oldState, newState);
+ }
+ }
+ }
+
+ public void Error(Exception e)
+ {
+ _logger.Debug("State manager receive error notification: " + e);
+ lock ( _syncLock )
+ {
+ foreach ( IStateListener l in _stateListeners )
+ {
+ l.Error(e);
+ }
+ }
+ }
+
+ public bool MethodReceived(AMQMethodEvent evt)
+ {
+ _logger.Debug(String.Format("Finding method handler. currentState={0} type={1}", _currentState, evt.Method.GetType()));
+ IStateAwareMethodListener handler = FindStateTransitionHandler(_currentState, evt.Method);
+ if (handler != null)
+ {
+ handler.MethodReceived(this, evt);
+ return true;
+ }
+ return false;
+ }
+
+ /// <summary>
+ /// Finds the state transition handler.
+ /// </summary>
+ /// <param name="currentState">State of the current.</param>
+ /// <param name="frame">The frame.</param>
+ /// <returns></returns>
+ /// <exception cref="IllegalStateTransitionException">if the state transition if not allowed</exception>
+ private IStateAwareMethodListener FindStateTransitionHandler(AMQState currentState,
+ AMQMethodBody frame)
+ {
+ Type clazz = frame.GetType();
+ if (_logger.IsDebugEnabled)
+ {
+ _logger.Debug("Looking for state transition handler for frame " + clazz);
+ }
+ IDictionary classToHandlerMap = (IDictionary) _state2HandlersMap[currentState];
+
+ if (classToHandlerMap == null)
+ {
+ // if no specialised per state handler is registered look for a
+ // handler registered for "all" states
+ return FindStateTransitionHandler(AMQState.ALL, frame);
+ }
+ IStateAwareMethodListener handler = (IStateAwareMethodListener) classToHandlerMap[clazz];
+ if (handler == null)
+ {
+ if (currentState == AMQState.ALL)
+ {
+ _logger.Debug("No state transition handler defined for receiving frame " + frame);
+ return null;
+ }
+ else
+ {
+ // if no specialised per state handler is registered look for a
+ // handler registered for "all" states
+ return FindStateTransitionHandler(AMQState.ALL, frame);
+ }
+ }
+ else
+ {
+ return handler;
+ }
+ }
+
+ public void AddStateListener(IStateListener listener)
+ {
+ _logger.Debug("Adding state listener");
+ lock ( _syncLock )
+ {
+ _stateListeners.Add(listener);
+ }
+ }
+
+ public void RemoveStateListener(IStateListener listener)
+ {
+ lock ( _syncLock )
+ {
+ _stateListeners.Remove(listener);
+ }
+ }
+
+ public void AttainState(AMQState s)
+ {
+ if (_currentState != s)
+ {
+ StateWaiter sw = null;
+ try
+ {
+ _logger.Debug("Adding state wait to reach state " + s);
+ sw = new StateWaiter(s);
+ AddStateListener(sw);
+ sw.WaituntilStateHasChanged();
+ // at this point the state will have changed.
+ }
+ finally
+ {
+ RemoveStateListener(sw);
+ }
+ }
+ }
+ }
+}
diff --git a/qpid/dotnet/Qpid.Client/Client/State/IAMQStateListener.cs b/qpid/dotnet/Qpid.Client/Client/State/IAMQStateListener.cs
new file mode 100644
index 0000000000..31e4b5046d
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/State/IAMQStateListener.cs
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace Apache.Qpid.Client.State
+{
+ public interface IAMQStateListener
+ {
+ void StateChanged(AMQStateChangedEvent evt);
+ }
+}
+
+
diff --git a/qpid/dotnet/Qpid.Client/Client/State/IStateAwareMethodListener.cs b/qpid/dotnet/Qpid.Client/Client/State/IStateAwareMethodListener.cs
new file mode 100644
index 0000000000..0874f39665
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/State/IStateAwareMethodListener.cs
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using Apache.Qpid.Client.Protocol;
+
+namespace Apache.Qpid.Client.State
+{
+ public interface IStateAwareMethodListener
+ {
+ void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt);
+ }
+}
+
+
diff --git a/qpid/dotnet/Qpid.Client/Client/State/IStateListener.cs b/qpid/dotnet/Qpid.Client/Client/State/IStateListener.cs
new file mode 100644
index 0000000000..edd7382f93
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/State/IStateListener.cs
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+
+namespace Apache.Qpid.Client.State
+{
+ public interface IStateListener
+ {
+ void StateChanged(AMQState oldState, AMQState newState);
+
+ void Error(Exception e);
+ }
+}
+
+
diff --git a/qpid/dotnet/Qpid.Client/Client/State/IllegalStateTransitionException.cs b/qpid/dotnet/Qpid.Client/Client/State/IllegalStateTransitionException.cs
new file mode 100644
index 0000000000..81de622617
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/State/IllegalStateTransitionException.cs
@@ -0,0 +1,74 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Runtime.Serialization;
+
+namespace Apache.Qpid.Client.State
+{
+ [Serializable]
+ public class IllegalStateTransitionException : AMQException
+ {
+ private AMQState _originalState;
+
+ private Type _frame;
+
+ public IllegalStateTransitionException(AMQState originalState, Type frame)
+ : base("No valid state transition defined for receiving frame " + frame +
+ " from state " + originalState)
+ {
+ _originalState = originalState;
+ _frame = frame;
+ }
+
+ protected IllegalStateTransitionException(SerializationInfo info, StreamingContext ctxt)
+ : base(info, ctxt)
+ {
+ _originalState = (AMQState)info.GetValue("OriginalState", typeof(AMQState));
+ _frame = (Type)info.GetValue("FrameType", typeof(Type));
+ }
+
+ public AMQState OriginalState
+ {
+ get
+ {
+ return _originalState;
+ }
+ }
+
+ public Type FrameType
+ {
+ get
+ {
+ return _frame;
+ }
+ }
+
+ public override void GetObjectData(SerializationInfo info, StreamingContext context)
+ {
+ base.GetObjectData(info, context);
+ info.AddValue("OriginalState", OriginalState);
+ info.AddValue("FrameType", FrameType);
+ }
+ }
+}
+
+
+
diff --git a/qpid/dotnet/Qpid.Client/Client/State/StateWaiter.cs b/qpid/dotnet/Qpid.Client/Client/State/StateWaiter.cs
new file mode 100644
index 0000000000..e739d0cb44
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/State/StateWaiter.cs
@@ -0,0 +1,121 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+using Apache.Qpid.Client.Protocol;
+using log4net;
+
+namespace Apache.Qpid.Client.State
+{
+ public class StateWaiter : IStateListener
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(StateWaiter));
+
+ private readonly AMQState _state;
+ private AMQState _newState;
+
+ private volatile bool _newStateAchieved;
+
+ private volatile Exception _exception;
+
+ private ManualResetEvent _resetEvent = new ManualResetEvent(false);
+
+ public StateWaiter(AMQState state)
+ {
+ _state = state;
+ }
+
+ public void StateChanged(AMQState oldState, AMQState newState)
+ {
+ _newState = newState;
+ if (_logger.IsDebugEnabled)
+ {
+ _logger.Debug("stateChanged called");
+ }
+ if (_state == newState)
+ {
+ _newStateAchieved = true;
+
+ if (_logger.IsDebugEnabled)
+ {
+ _logger.Debug("New state reached so notifying monitor");
+ }
+ _resetEvent.Set();
+ }
+ }
+
+ public void Error(Exception e)
+ {
+ if (_logger.IsDebugEnabled)
+ {
+ _logger.Debug("exceptionThrown called");
+ }
+
+ _exception = e;
+ _resetEvent.Set();
+ }
+
+ public void WaituntilStateHasChanged()
+ {
+ //
+ // The guard is required in case we are woken up by a spurious
+ // notify().
+ //
+
+ TimeSpan waitTime = TimeSpan.FromMilliseconds(DefaultTimeouts.MaxWaitForState);
+ DateTime waitUntilTime = DateTime.Now + waitTime;
+
+ while ( !_newStateAchieved
+ && _exception == null
+ && waitTime.TotalMilliseconds > 0 )
+ {
+ _logger.Debug("State not achieved so waiting...");
+ try
+ {
+ _resetEvent.WaitOne(waitTime, true);
+ }
+ finally
+ {
+ if (!_newStateAchieved)
+ {
+ waitTime = waitUntilTime - DateTime.Now;
+ }
+ }
+ }
+
+ if (_exception != null)
+ {
+ _logger.Debug("Throwable reached state waiter: " + _exception);
+ if (_exception is AMQException)
+ throw _exception;
+ else
+ throw new AMQException("Error: " + _exception, _exception);
+ }
+
+ if (!_newStateAchieved)
+ {
+ string error = string.Format("State not achieved within permitted time. Current state: {0}, desired state: {1}", _state, _newState);
+ _logger.Warn(error);
+ throw new AMQException(error);
+ }
+ }
+ }
+}