diff options
Diffstat (limited to 'qpid/dotnet/Qpid.Client/Client/AMQConnection.cs')
-rw-r--r-- | qpid/dotnet/Qpid.Client/Client/AMQConnection.cs | 873 |
1 files changed, 873 insertions, 0 deletions
diff --git a/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs b/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs new file mode 100644 index 0000000000..41d4e089b6 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs @@ -0,0 +1,873 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using System.IO; +using System.Reflection; +using System.Threading; +using log4net; +using Apache.Qpid.Client.Failover; +using Apache.Qpid.Client.Protocol; +using Apache.Qpid.Client.Qms; +using Apache.Qpid.Client.State; +using Apache.Qpid.Client.Transport; +using Apache.Qpid.Client.Transport.Socket.Blocking; +using Apache.Qpid.Collections; +using Apache.Qpid.Framing; +using Apache.Qpid.Messaging; + +namespace Apache.Qpid.Client +{ + public class AMQConnection : Closeable, IConnection + { + private static readonly ILog _log = LogManager.GetLogger(typeof(AMQConnection)); + + IConnectionInfo _connectionInfo; + private int _nextChannelId = 0; + + // _Connected should be refactored with a suitable wait object. + private bool _connected; + + Thread _heartBeatThread; + HeartBeatThread _heartBeatRunner; + + // The last error code that occured on the connection. Used to return the correct exception to the client + private AMQException _lastAMQException = null; + + /** + * This is the "root" mutex that must be held when doing anything that could be impacted by failover. + * This must be held by any child objects of this connection such as the session, producers and consumers. + */ + private readonly Object _failoverMutex = new Object(); + public object FailoverMutex + { + get { return _failoverMutex; } + } + + /** + * Policy dictating how to failover + */ + private FailoverPolicy _failoverPolicy; + + internal bool IsFailoverAllowed + { + get { if(!_connected) return false; else return _failoverPolicy.FailoverAllowed(); } + } + + /// <summary> + /// A channel is roughly analogous to a session. The server can negotiate the maximum number of channels + /// per session and we must prevent the client from opening too many. Zero means unlimited. + /// </summary> + private ushort _maximumChannelCount; + + /// <summary> + /// The maximum size of frame supported by the server + /// </summary> + private uint _maximumFrameSize; + + private AMQStateManager _stateManager; + + private AMQProtocolSession _protocolSession; + public AMQProtocolSession ProtocolSession { get { return _protocolSession; } } + + /// <summary> + /// Maps from session id (Integer) to AmqChannel instance + /// </summary> + private readonly IDictionary _sessions = new LinkedHashtable(); + + private ExceptionListenerDelegate _exceptionListener; + + private IConnectionListener _connectionListener; + + private ITransport _transport; + public ITransport Transport { get { return _transport; } } + + /// <summary> + /// Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for + /// message publication. + /// </summary> + private bool _started; + + private AMQProtocolListener _protocolListener; + public AMQProtocolListener ProtocolListener { get { return _protocolListener; } } + + public IProtocolWriter ProtocolWriter + { + get { return _transport.ProtocolWriter; } + } + + ProtocolWriter _protocolWriter; + + public ProtocolWriter ConvenientProtocolWriter + { + get { return _protocolWriter; } + } + + public AMQConnection(IConnectionInfo connectionInfo) + { + if (connectionInfo == null) + { + throw new ArgumentException("ConnectionInfo must be specified"); + } + _log.Debug("ConnectionInfo: " + connectionInfo); + _connectionInfo = connectionInfo; + _log.Debug("password = " + _connectionInfo.Password); + _failoverPolicy = new FailoverPolicy(connectionInfo); + + // We are not currently connected. + _connected = false; + + Exception lastException = null; + do + { + try + { + IBrokerInfo brokerInfo = _failoverPolicy.GetNextBrokerInfo(); + _log.Debug("Connecting to " + brokerInfo); + MakeBrokerConnection(brokerInfo); + break; + } + catch (Exception e) + { + lastException = e; + _log.Error("Unable to connect to broker " + _failoverPolicy.GetCurrentBrokerInfo(), e); + // XXX: Should perhaps break out of the do/while here if not a SocketException... + } + } while (!_connected && _failoverPolicy.FailoverAllowed()); + + _log.Debug("Are we connected:" + _connected); + + if (!_connected) + { + if ( lastException is AMQException ) + { + throw lastException; + } + else + { + throw new AMQConnectionException("Unable to connect", lastException); + } + } + + } + + /*private ITransport LoadTransportFromAssembly(string host, int port, String assemblyName, String transportType) + { + //Assembly assembly = Assembly.LoadFrom(assemblyName); + Assembly assembly = Assembly.Load(assemblyName); + + foreach (Type type in assembly.GetTypes()) + { + _log.Debug(String.Format("type = {0}", type)); + } + + Type transport = assembly.GetType(transportType); + + if (transport == null) + { + throw new ArgumentException( + String.Format("Type is not found in assembly. Type={0} Assembly={1}", transportType, assemblyName)); + + } + + _log.Debug("transport = " + transport); + _log.Debug("ctors = " + transport.GetConstructors()); + + ConstructorInfo info = transport.GetConstructors()[0]; + ITransport result = (ITransport)info.Invoke(new object[] { host, port, this }); + + _log.Debug("transport = " + result); + + return result; + }*/ + + public void Disconnect() + { + _transport.Close(); + } + + #region IConnection Members + + public string ClientID + { + get + { + CheckNotClosed(); + return _connectionInfo.ClientName; + } + set + { + CheckNotClosed(); + _connectionInfo.ClientName = value; + } + } + + public override void Close() + { + lock (FailoverMutex) + { + // atomically set to closed and check the _previous value was NOT CLOSED + if (Interlocked.Exchange(ref _closed, CLOSED) == NOT_CLOSED) + { + try + { + CloseAllSessions(null); + CloseConnection(); + } + catch (AMQException e) + { + throw new QpidException("Error closing connection: " + e); + } + } + } + } + + private void CloseConnection() + { + _stateManager.ChangeState(AMQState.CONNECTION_CLOSING); + + AMQFrame frame = ConnectionCloseBody.CreateAMQFrame( + 0, 200, "Qpid.NET client is closing the connection.", 0, 0); + + ProtocolWriter.Write(frame); + + _log.Debug("Blocking for connection close ok frame"); + + Disconnect(); + } + + class CreateChannelFailoverSupport : FailoverSupport + { + private static readonly ILog _log = LogManager.GetLogger(typeof(CreateChannelFailoverSupport)); + + private bool _transacted; + private AcknowledgeMode _acknowledgeMode; + int _prefetchHigh; + int _prefetchLow; + AMQConnection _connection; + + public CreateChannelFailoverSupport(AMQConnection connection, bool transacted, AcknowledgeMode acknowledgeMode, int prefetchHigh, int prefetchLow) + { + _connection = connection; + _transacted = transacted; + _acknowledgeMode = acknowledgeMode; + _prefetchHigh = prefetchHigh; + _prefetchLow = prefetchLow; + } + + protected override object operation() + { + ushort channelId = _connection.NextChannelId(); + + if (_log.IsDebugEnabled) + { + _log.Debug("Write channel open frame for channel id " + channelId); + } + + // We must create the channel and register it before actually sending the frame to the server to + // open it, so that there is no window where we could receive data on the channel and not be set + // up to handle it appropriately. + AmqChannel channel = new AmqChannel(_connection, + channelId, _transacted, _acknowledgeMode, _prefetchHigh, _prefetchLow); + _connection.ProtocolSession.AddSessionByChannel(channelId, channel); + _connection.RegisterSession(channelId, channel); + + bool success = false; + try + { + _connection.CreateChannelOverWire(channelId, _prefetchHigh, _prefetchLow, _transacted); + success = true; + } + catch (AMQException e) + { + throw new QpidException("Error creating channel: " + e, e); + } + finally + { + if (!success) { + _connection.ProtocolSession.RemoveSessionByChannel(channelId); + _connection.DeregisterSession(channelId); + } + } + + if (_connection._started) + { + channel.Start(); + } + return channel; + } + } + + internal ushort NextChannelId() + { + return (ushort) Interlocked.Increment(ref _nextChannelId); + } + + public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode) + { + return CreateChannel(transacted, acknowledgeMode, AmqChannel.DEFAULT_PREFETCH_HIGH_MARK); + } + + public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode, int prefetch) + { + return CreateChannel(transacted, acknowledgeMode, prefetch, prefetch); + } + + public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode, int prefetchHigh, int prefetchLow) + { + CheckNotClosed(); + if (ChannelLimitReached()) + { + throw new ChannelLimitReachedException(_maximumChannelCount); + } + else + { + CreateChannelFailoverSupport operation = + new CreateChannelFailoverSupport(this, transacted, acknowledgeMode, prefetchHigh, prefetchLow); + return (IChannel)operation.execute(this); + } + } + + public void CloseSession(AmqChannel channel) + { + // FIXME: Don't we need FailoverSupport here (as we have SyncWrite). + _protocolSession.CloseSession(channel); + + AMQFrame frame = ChannelCloseBody.CreateAMQFrame( + channel.ChannelId, 200, "JMS client closing channel", 0, 0); + + _log.Debug("Blocking for channel close frame for channel " + channel.ChannelId); + _protocolWriter.SyncWrite(frame, typeof(ChannelCloseOkBody)); + _log.Debug("Received channel close frame"); + // When control resumes at this point, a reply will have been received that + // indicates the broker has closed the channel successfully + } + + public ExceptionListenerDelegate ExceptionListener + { + get + { + CheckNotClosed(); + return _exceptionListener; + } + set + { + CheckNotClosed(); + _exceptionListener = value; + } + } + + /// <summary> + /// Start the connection, i.e. start flowing messages. Note that this method must be called only from a single thread + /// and is not thread safe (which is legal according to the JMS specification). + /// @throws JMSException + /// </summary> + public void Start() + { + CheckNotClosed(); + + if (!_started) + { + foreach (DictionaryEntry lde in _sessions) + { + AmqChannel s = (AmqChannel)lde.Value; + s.Start(); + } + _started = true; + } + } + + public void Stop() + { + CheckNotClosed(); + + if (_started) + { + foreach (DictionaryEntry lde in _sessions) + { + AmqChannel s = (AmqChannel) lde.Value; + s.Stop(); + } + _started = false; + } + } + + public IConnectionListener ConnectionListener + { + get { return _connectionListener; } + set { _connectionListener = value; } + } + + #endregion + + #region IDisposable Members + + public void Dispose() + { + Close(); + } + + #endregion + + private bool ChannelLimitReached() + { + return _maximumChannelCount != 0 && _sessions.Count == _maximumChannelCount; + } + + /// <summary> + /// Close all the sessions, either due to normal connection closure or due to an error occurring. + /// @param cause if not null, the error that is causing this shutdown + /// </summary> + private void CloseAllSessions(Exception cause) + { + _log.Debug("Closing all session in connection " + this); + ICollection sessions = new ArrayList(_sessions.Values); + foreach (AmqChannel channel in sessions) + { + _log.Debug("Closing channel " + channel); + if (cause != null) + { + channel.ClosedWithException(cause); + } + else + { + try + { + channel.Close(); + } + catch (QpidException e) + { + _log.Error("Error closing channel: " + e); + } + } + } + _log.Debug("Done closing all sessions in connection " + this); + } + + public int MaximumChannelCount + { + get + { + CheckNotClosed(); + return _maximumChannelCount; + } + } + + internal void SetMaximumChannelCount(ushort maximumChannelCount) + { + CheckNotClosed(); + _maximumChannelCount = maximumChannelCount; + } + + public uint MaximumFrameSize + { + get + { + return _maximumFrameSize; + } + + set + { + _maximumFrameSize = value; + } + } + + public IDictionary Sessions + { + get + { + return _sessions; + } + } + + public string Host + { + get + { + return _failoverPolicy.GetCurrentBrokerInfo().Host; + } + } + + public int Port + { + get + { + return _failoverPolicy.GetCurrentBrokerInfo().Port; + } + } + + public string Username + { + get + { + return _connectionInfo.Username; + } + } + + public string Password + { + get + { + return _connectionInfo.Password; + } + } + + public string VirtualHost + { + get + { + return _connectionInfo.VirtualHost; + } + } + + /// <summary> + /// Invoked by the AMQProtocolSession when a protocol session exception has occurred. + /// This method sends the exception to a JMS exception listener, if configured, and + /// propagates the exception to sessions, which in turn will propagate to consumers. + /// This allows synchronous consumers to have exceptions thrown to them. + /// </summary> + /// <param name="cause">the exception</param> + public void ExceptionReceived(Exception cause) + { + if (_exceptionListener != null) + { + // Listener expects one of these... + QpidException xe; + + if (cause is QpidException) + { + xe = (QpidException) cause; + } + else + { + xe = new QpidException("Exception thrown against " + ToString() + ": " + cause, cause); + } + // in the case of an IOException, MINA has closed the protocol session so we set _closed to true + // so that any generic client code that tries to close the connection will not mess up this error + // handling sequence + if (cause is IOException) + { + Interlocked.Exchange(ref _closed, CLOSED); + } +#if __MonoCS__ + _exceptionListener(xe); +#else + _exceptionListener.Invoke(xe); +#endif + } + else + { + _log.Error("Connection exception: " + cause); + } + + // An undelivered is not fatal to the connections usability. + if (!(cause is AMQUndeliveredException)) + { + Interlocked.Exchange(ref _closed, CLOSED); + CloseAllSessions(cause); + } + else + { + ; + } + } + + internal void RegisterSession(int channelId, AmqChannel channel) + { + _sessions[channelId] = channel; + } + + internal void DeregisterSession(int channelId) + { + _sessions.Remove(channelId); + } + + /** + * Fire the preFailover event to the registered connection listener (if any) + * + * @param redirect true if this is the result of a redirect request rather than a connection error + * @return true if no listener or listener does not veto change + */ + public bool FirePreFailover(bool redirect) + { + bool proceed = true; + if (_connectionListener != null) + { + proceed = _connectionListener.PreFailover(redirect); + } + return proceed; + } + + /** + * Fire the preResubscribe event to the registered connection listener (if any). If the listener + * vetoes resubscription then all the sessions are closed. + * + * @return true if no listener or listener does not veto resubscription. + * @throws JMSException + */ + public bool FirePreResubscribe() + { + if (_connectionListener != null) + { + bool resubscribe = _connectionListener.PreResubscribe(); + if (!resubscribe) + { + MarkAllSessionsClosed(); + } + return resubscribe; + } + else + { + return true; + } + } + + /** + * Marks all sessions and their children as closed without sending any protocol messages. Useful when + * you need to mark objects "visible" in userland as closed after failover or other significant event that + * impacts the connection. + * <p/> + * The caller must hold the failover mutex before calling this method. + */ + private void MarkAllSessionsClosed() + { + //LinkedList sessionCopy = new LinkedList(_sessions.values()); + ArrayList sessionCopy = new ArrayList(_sessions.Values); + foreach (AmqChannel session in sessionCopy) + { + session.MarkClosed(); + } + _sessions.Clear(); + } + + /** + * Fires a failover complete event to the registered connection listener (if any). + */ + public void FireFailoverComplete() + { + if (_connectionListener != null) + { + _connectionListener.FailoverComplete(); + } + } + + public bool AttemptReconnection(String host, int port, SslOptions sslConfig) + { + IBrokerInfo bd = new AmqBrokerInfo("amqp", host, port, sslConfig); + + _failoverPolicy.setBroker(bd); + + try + { + MakeBrokerConnection(bd); + return true; + } + catch (Exception e) + { + _log.Debug("Unable to connect to broker at " + bd, e); + AttemptReconnection(); + } + return false; + } + + private void MakeBrokerConnection(IBrokerInfo brokerDetail) + { + try + { + _stateManager = new AMQStateManager(); + _protocolListener = new AMQProtocolListener(this, _stateManager); + _protocolListener.AddFrameListener(_stateManager); + + /* + // Currently there is only one transport option - BlockingSocket. + String assemblyName = "Apache.Qpid.Client.Transport.Socket.Blocking.dll"; + String transportType = "Apache.Qpid.Client.Transport.Socket.Blocking.BlockingSocketTransport"; + + // Load the transport assembly dynamically. + _transport = LoadTransportFromAssembly(brokerDetail.getHost(), brokerDetail.getPort(), assemblyName, transportType); + */ + + _transport = new BlockingSocketTransport(); + + // Connect. + _transport.Connect(brokerDetail, this); + _protocolWriter = new ProtocolWriter(_transport.ProtocolWriter, _protocolListener); + _protocolSession = new AMQProtocolSession(_transport.ProtocolWriter, _transport, this); + _protocolListener.ProtocolSession = _protocolSession; + + // Now start the connection "handshake". + _transport.ProtocolWriter.Write(new ProtocolInitiation()); + + // Blocks until the connection has been opened. + _stateManager.AttainState(AMQState.CONNECTION_OPEN); + + _failoverPolicy.attainedConnection(); + + // XXX: Again this should be changed to a suitable notify. + _connected = true; + } + catch (AMQException e) + { + _lastAMQException = e; + throw; // rethrow + } + } + + public bool AttemptReconnection() + { + while (_failoverPolicy.FailoverAllowed()) + { + try + { + MakeBrokerConnection(_failoverPolicy.GetNextBrokerInfo()); + return true; + } + catch (Exception e) + { + if (!(e is AMQException)) + { + _log.Debug("Unable to connect to broker at " + _failoverPolicy.GetCurrentBrokerInfo(), e); + } + else + { + _log.Debug(e.Message + ":Unable to connect to broker at " + _failoverPolicy.GetCurrentBrokerInfo()); + } + } + } + + // Connection unsuccessful. + return false; + } + + /** + * For all channels, and for all consumers in those sessions, resubscribe. This is called during failover handling. + * The caller must hold the failover mutex before calling this method. + */ + public void ResubscribeChannels() + { + ArrayList channels = new ArrayList(_sessions.Values); + _log.Debug(String.Format("Resubscribing sessions = {0} sessions.size={1}", channels, channels.Count)); + foreach (AmqChannel channel in channels) + { + _protocolSession.AddSessionByChannel(channel.ChannelId, channel); + ReopenChannel( + channel.ChannelId, + channel.DefaultPrefetchHigh, + channel.DefaultPrefetchLow, + channel.Transacted + ); + channel.ReplayOnFailOver(); + } + } + + private void ReopenChannel(ushort channelId, int prefetchHigh, int prefetchLow, bool transacted) + { + _log.Debug(string.Format("Reopening channel id={0} prefetchHigh={1} prefetchLow={2} transacted={3}", + channelId, prefetchHigh, prefetchLow, transacted)); + try + { + CreateChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); + } + catch (AMQException e) + { + _protocolSession.RemoveSessionByChannel(channelId); + DeregisterSession(channelId); + throw new AMQException("Error reopening channel " + channelId + " after failover: " + e); + } + } + + void CreateChannelOverWire(ushort channelId, int prefetchHigh, int prefetchLow, bool transacted) + { + _protocolWriter.SyncWrite(ChannelOpenBody.CreateAMQFrame(channelId, null), typeof (ChannelOpenOkBody)); + + // Don't use the BasicQos frame if connecting to OpenAMQ (at it is not support). We + // know this when we connection using AMQP 0.7 + if (ProtocolInitiation.CURRENT_PROTOCOL_VERSION_MAJOR != 7) + { + // Basic.Qos frame appears to not be supported by OpenAMQ 1.0d. + _protocolWriter.SyncWrite(BasicQosBody.CreateAMQFrame(channelId, 0, (ushort)prefetchHigh, false), typeof (BasicQosOkBody)); + } + + if (transacted) + { + if (_log.IsDebugEnabled) + { + _log.Debug("Issuing TxSelect for " + channelId); + } + _protocolWriter.SyncWrite(TxSelectBody.CreateAMQFrame(channelId), typeof(TxSelectOkBody)); + } + } + + public String toURL() + { + return _connectionInfo.AsUrl(); + } + + class HeartBeatThread + { + int _heartbeatMillis; + IProtocolWriter _protocolWriter; + bool _run = true; + + public HeartBeatThread(IProtocolWriter protocolWriter, int heartbeatMillis) + { + _protocolWriter = protocolWriter; + _heartbeatMillis = heartbeatMillis; + } + + public void Run() + { + while (_run) + { + Thread.Sleep(_heartbeatMillis); + if (!_run) break; + _log.Debug("Sending heartbeat"); + // TODO: Can we optimise this so that heartbeats are only written when we haven't sent anything recently to the broker? + _protocolWriter.Write(HeartbeatBody.FRAME); + } + _log.Debug("Heatbeat thread stopped"); + } + + public void Stop() + { + _run = false; + } + } + + public void StartHeartBeatThread(int heartbeatSeconds) + { + _log.Debug("Starting new heartbeat thread"); + _heartBeatRunner = new HeartBeatThread(ProtocolWriter, heartbeatSeconds * 1000); + _heartBeatThread = new Thread(new ThreadStart(_heartBeatRunner.Run)); + _heartBeatThread.Name = "HeartBeat"; + _heartBeatThread.Start(); + } + + public void StopHeartBeatThread() + { + if (_heartBeatRunner != null) + { + _log.Debug("Stopping old heartbeat thread"); + _heartBeatRunner.Stop(); + } + } + } +} |