diff options
Diffstat (limited to 'qpid/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs')
-rw-r--r-- | qpid/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs | 175 |
1 files changed, 175 insertions, 0 deletions
diff --git a/qpid/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs b/qpid/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs new file mode 100644 index 0000000000..83c69b7d25 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs @@ -0,0 +1,175 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; +using log4net; +using Apache.Qpid.Client.Protocol; +using Apache.Qpid.Client.State; + +namespace Apache.Qpid.Client.Failover +{ + public class FailoverHandler + { + private static readonly ILog _log = LogManager.GetLogger(typeof(FailoverHandler)); + + private AMQConnection _connection; + + /** + * Used where forcing the failover host + */ + private String _host; + + /** + * Used where forcing the failover port + */ + private int _port; + + public FailoverHandler(AMQConnection connection) + { + _connection = connection; + } + + public void Run() + { + if (Thread.CurrentThread.IsBackground) + { + throw new InvalidOperationException("FailoverHandler must Run on a non-background thread."); + } + + AMQProtocolListener pl = _connection.ProtocolListener; + pl.FailoverLatch = new ManualResetEvent(false); + + // We wake up listeners. If they can handle failover, they will extend the + // FailoverSupport class and will in turn block on the latch until failover + // has completed before retrying the operation + _connection.ProtocolListener.PropagateExceptionToWaiters(new FailoverException("Failing over about to start")); + + // Since failover impacts several structures we protect them all with a single mutex. These structures + // are also in child objects of the connection. This allows us to manipulate them without affecting + // client code which runs in a separate thread. + lock (_connection.FailoverMutex) + { + _log.Info("Starting failover process"); + + // We switch in a new state manager temporarily so that the interaction to get to the "connection open" + // state works, without us having to terminate any existing "state waiters". We could theoretically + // have a state waiter waiting until the connection is closed for some reason. Or in future we may have + // a slightly more complex state model therefore I felt it was worthwhile doing this. + AMQStateManager existingStateManager = _connection.ProtocolListener.StateManager; + _connection.ProtocolListener.StateManager = new AMQStateManager(); + if (!_connection.FirePreFailover(_host != null)) + { + _connection.ProtocolListener.StateManager = existingStateManager; + if (_host != null) + { + _connection.ExceptionReceived(new AMQDisconnectedException("Redirect was vetoed by client")); + } + else + { + _connection.ExceptionReceived(new AMQDisconnectedException("Failover was vetoed by client")); + } + pl.FailoverLatch.Set(); + pl.FailoverLatch = null; + return; + } + bool failoverSucceeded; + // when host is non null we have a specified failover host otherwise we all the client to cycle through + // all specified hosts + + // if _host has value then we are performing a redirect. + if (_host != null) + { + // todo: fix SSL support! + failoverSucceeded = _connection.AttemptReconnection(_host, _port, null); + } + else + { + failoverSucceeded = _connection.AttemptReconnection(); + } + + // XXX: at this point it appears that we are going to set StateManager to existingStateManager in + // XXX: both paths of control. + if (!failoverSucceeded) + { + _connection.ProtocolListener.StateManager = existingStateManager; + _connection.ExceptionReceived( + new AMQDisconnectedException("Server closed connection and no failover " + + "was successful")); + } + else + { + _connection.ProtocolListener.StateManager = existingStateManager; + try + { + if (_connection.FirePreResubscribe()) + { + _log.Info("Resubscribing on new connection"); + _connection.ResubscribeChannels(); + } + else + { + _log.Info("Client vetoed automatic resubscription"); + } + _connection.FireFailoverComplete(); + _connection.ProtocolListener.FailoverState = FailoverState.NOT_STARTED; + _log.Info("Connection failover completed successfully"); + } + catch (Exception e) + { + _log.Info("Failover process failed - exception being propagated by protocol handler"); + _connection.ProtocolListener.FailoverState = FailoverState.FAILED; + try + { + _connection.ProtocolListener.OnException(e); + } + catch (Exception ex) + { + _log.Error("Error notifying protocol session of error: " + ex, ex); + } + } + } + } + pl.FailoverLatch.Set(); + } + + public String getHost() + { + return _host; + } + + public void setHost(String host) + { + _host = host; + } + + public int getPort() + { + return _port; + } + + public void setPort(int port) + { + _port = port; + } + } +} + + |