/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using System; using System.Collections; using System.Threading; using log4net; using Apache.Qpid.Client.Failover; using Apache.Qpid.Client.Protocol.Listener; using Apache.Qpid.Client.State; using Apache.Qpid.Framing; namespace Apache.Qpid.Client.Protocol { /// /// AMQProtocolListener /// ///

Fail-over state transition rules... /// ///

The failover handler is created when the session is created since it needs a reference to the IoSession in order /// to be able to send errors during failover back to the client application. The session won't be available in the case /// when failing over due to a Connection.Redirect message from the broker. /// ///

///
CRC Card
Responsibilities Collaborations ///
Track fail over state of a connection. ///
Manage method listeners. IAMQMethodListener ///
Receive notification of all IO errors on a connection. IoHandler ///
Inform method listeners of all method events on a connection. IAMQMethodListener ///
Inform method listeners of all error events on a connection. IAMQMethodListener ///
/// /// Todo: The broker will close the connection with no warning if authentication fails. This may result in the fail-over process being /// triggered, when it should not be. /// ///

public class AMQProtocolListener : IProtocolListener { /// Used for debugging. private static readonly ILog _log = LogManager.GetLogger(typeof(AMQProtocolListener)); /// /// Holds the failover handler for the connection. When a failure is detected, and the current failover state allows it, /// the failover process is handed off to this handler. /// private FailoverHandler _failoverHandler; /// Tracks the current fail-over state. internal FailoverState _failoverState = FailoverState.NOT_STARTED; internal FailoverState FailoverState { get { return _failoverState; } set { _failoverState = value; } } internal ManualResetEvent FailoverLatch; AMQConnection _connection; AMQStateManager _stateManager; public AMQStateManager StateManager { get { return _stateManager; } set { _stateManager = value; } } private readonly ArrayList _frameListeners = ArrayList.Synchronized(new ArrayList()); AMQProtocolSession _protocolSession = null; private readonly Object _lock = new Object(); public AMQProtocolSession ProtocolSession { set { _protocolSession = value; } } public AMQProtocolListener(AMQConnection connection, AMQStateManager stateManager) { _connection = connection; _stateManager = stateManager; _failoverHandler = new FailoverHandler(connection); } public void OnMessage(IDataBlock message) { // Handle incorrect protocol version. if (message is ProtocolInitiation) { string error = String.Format("Protocol mismatch - {0}", message.ToString()); AMQException e = new AMQProtocolHeaderException(error); _log.Error("Closing connection because of protocol mismatch", e); //_protocolSession.CloseProtocolSession(); _stateManager.Error(e); return; } AMQFrame frame = (AMQFrame)message; if (frame.BodyFrame is AMQMethodBody) { if (_log.IsDebugEnabled) { _log.Debug("Method frame received: " + frame); } AMQMethodEvent evt = new AMQMethodEvent(frame.Channel, (AMQMethodBody)frame.BodyFrame, _protocolSession); try { bool wasAnyoneInterested = false; lock (_frameListeners.SyncRoot) { foreach (IAMQMethodListener listener in _frameListeners) { wasAnyoneInterested = listener.MethodReceived(evt) || wasAnyoneInterested; } } if (!wasAnyoneInterested) { throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener."); } } catch (Exception e) { foreach (IAMQMethodListener listener in _frameListeners) { listener.Error(e); } } } else if (frame.BodyFrame is ContentHeaderBody) { _protocolSession.MessageContentHeaderReceived(frame.Channel, (ContentHeaderBody)frame.BodyFrame); } else if (frame.BodyFrame is ContentBody) { _protocolSession.MessageContentBodyReceived(frame.Channel, (ContentBody)frame.BodyFrame); } else if (frame.BodyFrame is HeartbeatBody) { _log.Debug("HeartBeat received"); } } /// /// Receives notification of any IO exceptions on the connection. /// ///

Upon receipt of a connection closed exception or any IOException, the fail-over process is attempted. If the fail-over fails, then /// all method listeners and the application connection object are notified of the connection failure exception. /// ///

All other exception types are propagated to all method listeners. ///

public void OnException(Exception cause) { _log.Warn("public void OnException(Exception cause = " + cause + "): called"); // Ensure that the method listener set cannot be changed whilst this exception is propagated to all listeners. This also // ensures that this exception is fully propagated to all listeners, before another one can be processed. lock (_lock) { if (cause is AMQConnectionClosedException || cause is System.IO.IOException) { // Try a fail-over because the connection has failed. FailoverState failoverState = AttemptFailover(); // Check if the fail-over has failed, in which case notify all method listeners of the exception. // The application connection object is also notified of the failure of the connection with the exception. if (failoverState == FailoverState.FAILED) { _log.Debug("Fail-over has failed. Notifying all method listeners of the exception."); AMQException amqe = new AMQException("Protocol handler error: " + cause, cause); PropagateExceptionToWaiters(amqe); _connection.ExceptionReceived(cause); } } // Notify all method listeners of the exception. else { PropagateExceptionToWaiters(cause); _connection.ExceptionReceived(cause); } } } /// /// Tries to fail-over the connection, if the connection policy will permit it, and the fail-over process has not yet been /// started. If the connection does not allow fail-over then an exception will be raised. If a fail-over is already in progress /// this method allows it to continue to run and will do nothing. /// ///

This method should only be called when the connection has been remotely closed. ///

/// /// The fail-over state at the end of this attempt. private FailoverState AttemptFailover() { _log.Debug("private void AttemptFailover(): called"); _log.Debug("_failoverState = " + _failoverState); // Ensure that the connection stops sending heart beats, if it still is. _connection.StopHeartBeatThread(); // Check that the connection policy allows fail-over to be attempted. if (!_connection.IsFailoverAllowed) { _log.Debug("Connection does not allowed to failover"); _connection.ExceptionReceived( new AMQDisconnectedException("Broker closed connection and reconnection is not permitted.")); } // Check if connection was closed deliberately by the application, in which case no fail-over is attempted. if (_connection.Closed) { return _failoverState; } // If the connection allows fail-over and fail-over has not yet been started, then it is started and the fail-over state is // advanced to 'in progress' if (_failoverState == FailoverState.NOT_STARTED && _connection.IsFailoverAllowed) { _log.Info("Starting the fail-over process."); _failoverState = FailoverState.IN_PROGRESS; StartFailoverThread(); } return _failoverState; } /// /// There are two cases where we have other threads potentially blocking for events to be handled by this /// class. These are for the state manager (waiting for a state change) or a frame listener (waiting for a /// particular type of frame to arrive). When an error occurs we need to notify these waiters so that they can /// react appropriately. /// /// the exception to propagate /// public void PropagateExceptionToWaiters(Exception e) { // FIXME: not sure if required as StateManager is in _frameListeners. Probably something to do with fail-over. _stateManager.Error(e); lock ( _lock ) { foreach ( IAMQMethodListener listener in _frameListeners ) { listener.Error(e); } } } public void AddFrameListener(IAMQMethodListener listener) { lock ( _lock ) { _frameListeners.Add(listener); } } public void RemoveFrameListener(IAMQMethodListener listener) { if (_log.IsDebugEnabled) { _log.Debug("Removing frame listener: " + listener.ToString()); } lock ( _lock ) { _frameListeners.Remove(listener); } } public void BlockUntilNotFailingOver() { if (FailoverLatch != null) { FailoverLatch.WaitOne(); } } /// /// "Failover" for redirection. /// /// /// public void Failover(string host, int port) { _failoverHandler.setHost(host); _failoverHandler.setPort(port); // see javadoc for FailoverHandler to see rationale for separate thread StartFailoverThread(); } private void StartFailoverThread() { Thread failoverThread = new Thread(new ThreadStart(_failoverHandler.Run)); failoverThread.Name = "Failover"; // Do not inherit daemon-ness from current thread as this can be a daemon // thread such as a AnonymousIoService thread. failoverThread.IsBackground = false; failoverThread.Start(); } } }