diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
commit | 66765100f4257159622cefe57bed50125a5ad017 (patch) | |
tree | a88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs | |
parent | 1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff) | |
parent | 88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff) | |
download | qpid-python-66765100f4257159622cefe57bed50125a5ad017.tar.gz |
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs')
-rw-r--r-- | qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs | 318 |
1 files changed, 318 insertions, 0 deletions
diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs new file mode 100644 index 0000000000..c51538b70e --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs @@ -0,0 +1,318 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using System.Threading; +using log4net; +using Apache.Qpid.Client.Failover; +using Apache.Qpid.Client.Protocol.Listener; +using Apache.Qpid.Client.State; +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client.Protocol +{ + /// <summary> + /// AMQProtocolListener + /// + /// <p/>Fail-over state transition rules... + /// + /// <p/>The failover handler is created when the session is created since it needs a reference to the IoSession in order + /// to be able to send errors during failover back to the client application. The session won't be available in the case + /// when failing over due to a Connection.Redirect message from the broker. + /// + /// <p><table id="crc"><caption>CRC Card</caption> + /// <tr><th> Responsibilities <th> Collaborations + /// <tr><td> Track fail over state of a connection. + /// <tr><td> Manage method listeners. <td> IAMQMethodListener + /// <tr><td> Receive notification of all IO errors on a connection. <td> IoHandler + /// <tr><td> Inform method listeners of all method events on a connection. <td> IAMQMethodListener + /// <tr><td> Inform method listeners of all error events on a connection. <td> IAMQMethodListener + /// </table> + /// + /// <b>Todo:</b> The broker will close the connection with no warning if authentication fails. This may result in the fail-over process being + /// triggered, when it should not be. + /// + /// </summary> + public class AMQProtocolListener : IProtocolListener + { + /// <summary>Used for debugging.</summary> + private static readonly ILog _log = LogManager.GetLogger(typeof(AMQProtocolListener)); + + /// <summary> + /// Holds the failover handler for the connection. When a failure is detected, and the current failover state allows it, + /// the failover process is handed off to this handler. + /// </summary> + private FailoverHandler _failoverHandler; + + /// <summary>Tracks the current fail-over state.</summary> + internal FailoverState _failoverState = FailoverState.NOT_STARTED; + + internal FailoverState FailoverState + { + get { return _failoverState; } + set { _failoverState = value; } + } + + internal ManualResetEvent FailoverLatch; + + AMQConnection _connection; + AMQStateManager _stateManager; + + public AMQStateManager StateManager + { + get { return _stateManager; } + set { _stateManager = value; } + } + + private readonly ArrayList _frameListeners = ArrayList.Synchronized(new ArrayList()); + + AMQProtocolSession _protocolSession = null; + + private readonly Object _lock = new Object(); + + public AMQProtocolSession ProtocolSession { set { _protocolSession = value; } } + + public AMQProtocolListener(AMQConnection connection, AMQStateManager stateManager) + { + _connection = connection; + _stateManager = stateManager; + _failoverHandler = new FailoverHandler(connection); + } + + public void OnMessage(IDataBlock message) + { + // Handle incorrect protocol version. + if (message is ProtocolInitiation) + { + string error = String.Format("Protocol mismatch - {0}", message.ToString()); + AMQException e = new AMQProtocolHeaderException(error); + _log.Error("Closing connection because of protocol mismatch", e); + //_protocolSession.CloseProtocolSession(); + _stateManager.Error(e); + return; + } + + AMQFrame frame = (AMQFrame)message; + + if (frame.BodyFrame is AMQMethodBody) + { + if (_log.IsDebugEnabled) + { + _log.Debug("Method frame received: " + frame); + } + AMQMethodEvent evt = new AMQMethodEvent(frame.Channel, (AMQMethodBody)frame.BodyFrame, _protocolSession); + try + { + bool wasAnyoneInterested = false; + lock (_frameListeners.SyncRoot) + { + foreach (IAMQMethodListener listener in _frameListeners) + { + wasAnyoneInterested = listener.MethodReceived(evt) || wasAnyoneInterested; + } + } + if (!wasAnyoneInterested) + { + throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener."); + } + } + catch (Exception e) + { + foreach (IAMQMethodListener listener in _frameListeners) + { + listener.Error(e); + } + } + } + else if (frame.BodyFrame is ContentHeaderBody) + { + _protocolSession.MessageContentHeaderReceived(frame.Channel, + (ContentHeaderBody)frame.BodyFrame); + } + else if (frame.BodyFrame is ContentBody) + { + _protocolSession.MessageContentBodyReceived(frame.Channel, + (ContentBody)frame.BodyFrame); + } + else if (frame.BodyFrame is HeartbeatBody) + { + _log.Debug("HeartBeat received"); + } + } + + /// <summary> + /// Receives notification of any IO exceptions on the connection. + /// + /// <p/>Upon receipt of a connection closed exception or any IOException, the fail-over process is attempted. If the fail-over fails, then + /// all method listeners and the application connection object are notified of the connection failure exception. + /// + /// <p/>All other exception types are propagated to all method listeners. + /// </summary> + public void OnException(Exception cause) + { + _log.Warn("public void OnException(Exception cause = " + cause + "): called"); + + // Ensure that the method listener set cannot be changed whilst this exception is propagated to all listeners. This also + // ensures that this exception is fully propagated to all listeners, before another one can be processed. + lock (_lock) + { + if (cause is AMQConnectionClosedException || cause is System.IO.IOException) + { + // Try a fail-over because the connection has failed. + FailoverState failoverState = AttemptFailover(); + + // Check if the fail-over has failed, in which case notify all method listeners of the exception. + // The application connection object is also notified of the failure of the connection with the exception. + if (failoverState == FailoverState.FAILED) + { + _log.Debug("Fail-over has failed. Notifying all method listeners of the exception."); + + AMQException amqe = new AMQException("Protocol handler error: " + cause, cause); + PropagateExceptionToWaiters(amqe); + _connection.ExceptionReceived(cause); + } + } + // Notify all method listeners of the exception. + else + { + PropagateExceptionToWaiters(cause); + _connection.ExceptionReceived(cause); + } + } + } + + /// <summary> + /// Tries to fail-over the connection, if the connection policy will permit it, and the fail-over process has not yet been + /// started. If the connection does not allow fail-over then an exception will be raised. If a fail-over is already in progress + /// this method allows it to continue to run and will do nothing. + /// + /// <p/>This method should only be called when the connection has been remotely closed. + /// </summary> + /// + /// <returns>The fail-over state at the end of this attempt.</returns> + private FailoverState AttemptFailover() + { + _log.Debug("private void AttemptFailover(): called"); + _log.Debug("_failoverState = " + _failoverState); + + // Ensure that the connection stops sending heart beats, if it still is. + _connection.StopHeartBeatThread(); + + // Check that the connection policy allows fail-over to be attempted. + if (!_connection.IsFailoverAllowed) + { + _log.Debug("Connection does not allowed to failover"); + _connection.ExceptionReceived( + new AMQDisconnectedException("Broker closed connection and reconnection is not permitted.")); + } + + // Check if connection was closed deliberately by the application, in which case no fail-over is attempted. + if (_connection.Closed) + { + return _failoverState; + } + + // If the connection allows fail-over and fail-over has not yet been started, then it is started and the fail-over state is + // advanced to 'in progress' + if (_failoverState == FailoverState.NOT_STARTED && _connection.IsFailoverAllowed) + { + _log.Info("Starting the fail-over process."); + + _failoverState = FailoverState.IN_PROGRESS; + StartFailoverThread(); + } + + return _failoverState; + } + + /// <summary> + /// There are two cases where we have other threads potentially blocking for events to be handled by this + /// class. These are for the state manager (waiting for a state change) or a frame listener (waiting for a + /// particular type of frame to arrive). When an error occurs we need to notify these waiters so that they can + /// react appropriately. + /// + /// <param name="e">the exception to propagate</param> + /// </summary> + public void PropagateExceptionToWaiters(Exception e) + { + // FIXME: not sure if required as StateManager is in _frameListeners. Probably something to do with fail-over. + _stateManager.Error(e); + lock ( _lock ) + { + foreach ( IAMQMethodListener listener in _frameListeners ) + { + listener.Error(e); + } + } + } + + public void AddFrameListener(IAMQMethodListener listener) + { + lock ( _lock ) + { + _frameListeners.Add(listener); + } + } + + public void RemoveFrameListener(IAMQMethodListener listener) + { + if (_log.IsDebugEnabled) + { + _log.Debug("Removing frame listener: " + listener.ToString()); + } + lock ( _lock ) + { + _frameListeners.Remove(listener); + } + } + + public void BlockUntilNotFailingOver() + { + if (FailoverLatch != null) + { + FailoverLatch.WaitOne(); + } + } + + /// <summary> + /// "Failover" for redirection. + /// </summary> + /// <param name="host"></param> + /// <param name="port"></param> + public void Failover(string host, int port) + { + _failoverHandler.setHost(host); + _failoverHandler.setPort(port); + // see javadoc for FailoverHandler to see rationale for separate thread + StartFailoverThread(); + } + + private void StartFailoverThread() + { + Thread failoverThread = new Thread(new ThreadStart(_failoverHandler.Run)); + failoverThread.Name = "Failover"; + // Do not inherit daemon-ness from current thread as this can be a daemon + // thread such as a AnonymousIoService thread. + failoverThread.IsBackground = false; + failoverThread.Start(); + } + } +} |