summaryrefslogtreecommitdiff
path: root/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs')
-rw-r--r--trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs873
1 files changed, 0 insertions, 873 deletions
diff --git a/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs b/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs
deleted file mode 100644
index 41d4e089b6..0000000000
--- a/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs
+++ /dev/null
@@ -1,873 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Collections;
-using System.IO;
-using System.Reflection;
-using System.Threading;
-using log4net;
-using Apache.Qpid.Client.Failover;
-using Apache.Qpid.Client.Protocol;
-using Apache.Qpid.Client.Qms;
-using Apache.Qpid.Client.State;
-using Apache.Qpid.Client.Transport;
-using Apache.Qpid.Client.Transport.Socket.Blocking;
-using Apache.Qpid.Collections;
-using Apache.Qpid.Framing;
-using Apache.Qpid.Messaging;
-
-namespace Apache.Qpid.Client
-{
- public class AMQConnection : Closeable, IConnection
- {
- private static readonly ILog _log = LogManager.GetLogger(typeof(AMQConnection));
-
- IConnectionInfo _connectionInfo;
- private int _nextChannelId = 0;
-
- // _Connected should be refactored with a suitable wait object.
- private bool _connected;
-
- Thread _heartBeatThread;
- HeartBeatThread _heartBeatRunner;
-
- // The last error code that occured on the connection. Used to return the correct exception to the client
- private AMQException _lastAMQException = null;
-
- /**
- * This is the "root" mutex that must be held when doing anything that could be impacted by failover.
- * This must be held by any child objects of this connection such as the session, producers and consumers.
- */
- private readonly Object _failoverMutex = new Object();
- public object FailoverMutex
- {
- get { return _failoverMutex; }
- }
-
- /**
- * Policy dictating how to failover
- */
- private FailoverPolicy _failoverPolicy;
-
- internal bool IsFailoverAllowed
- {
- get { if(!_connected) return false; else return _failoverPolicy.FailoverAllowed(); }
- }
-
- /// <summary>
- /// A channel is roughly analogous to a session. The server can negotiate the maximum number of channels
- /// per session and we must prevent the client from opening too many. Zero means unlimited.
- /// </summary>
- private ushort _maximumChannelCount;
-
- /// <summary>
- /// The maximum size of frame supported by the server
- /// </summary>
- private uint _maximumFrameSize;
-
- private AMQStateManager _stateManager;
-
- private AMQProtocolSession _protocolSession;
- public AMQProtocolSession ProtocolSession { get { return _protocolSession; } }
-
- /// <summary>
- /// Maps from session id (Integer) to AmqChannel instance
- /// </summary>
- private readonly IDictionary _sessions = new LinkedHashtable();
-
- private ExceptionListenerDelegate _exceptionListener;
-
- private IConnectionListener _connectionListener;
-
- private ITransport _transport;
- public ITransport Transport { get { return _transport; } }
-
- /// <summary>
- /// Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for
- /// message publication.
- /// </summary>
- private bool _started;
-
- private AMQProtocolListener _protocolListener;
- public AMQProtocolListener ProtocolListener { get { return _protocolListener; } }
-
- public IProtocolWriter ProtocolWriter
- {
- get { return _transport.ProtocolWriter; }
- }
-
- ProtocolWriter _protocolWriter;
-
- public ProtocolWriter ConvenientProtocolWriter
- {
- get { return _protocolWriter; }
- }
-
- public AMQConnection(IConnectionInfo connectionInfo)
- {
- if (connectionInfo == null)
- {
- throw new ArgumentException("ConnectionInfo must be specified");
- }
- _log.Debug("ConnectionInfo: " + connectionInfo);
- _connectionInfo = connectionInfo;
- _log.Debug("password = " + _connectionInfo.Password);
- _failoverPolicy = new FailoverPolicy(connectionInfo);
-
- // We are not currently connected.
- _connected = false;
-
- Exception lastException = null;
- do
- {
- try
- {
- IBrokerInfo brokerInfo = _failoverPolicy.GetNextBrokerInfo();
- _log.Debug("Connecting to " + brokerInfo);
- MakeBrokerConnection(brokerInfo);
- break;
- }
- catch (Exception e)
- {
- lastException = e;
- _log.Error("Unable to connect to broker " + _failoverPolicy.GetCurrentBrokerInfo(), e);
- // XXX: Should perhaps break out of the do/while here if not a SocketException...
- }
- } while (!_connected && _failoverPolicy.FailoverAllowed());
-
- _log.Debug("Are we connected:" + _connected);
-
- if (!_connected)
- {
- if ( lastException is AMQException )
- {
- throw lastException;
- }
- else
- {
- throw new AMQConnectionException("Unable to connect", lastException);
- }
- }
-
- }
-
- /*private ITransport LoadTransportFromAssembly(string host, int port, String assemblyName, String transportType)
- {
- //Assembly assembly = Assembly.LoadFrom(assemblyName);
- Assembly assembly = Assembly.Load(assemblyName);
-
- foreach (Type type in assembly.GetTypes())
- {
- _log.Debug(String.Format("type = {0}", type));
- }
-
- Type transport = assembly.GetType(transportType);
-
- if (transport == null)
- {
- throw new ArgumentException(
- String.Format("Type is not found in assembly. Type={0} Assembly={1}", transportType, assemblyName));
-
- }
-
- _log.Debug("transport = " + transport);
- _log.Debug("ctors = " + transport.GetConstructors());
-
- ConstructorInfo info = transport.GetConstructors()[0];
- ITransport result = (ITransport)info.Invoke(new object[] { host, port, this });
-
- _log.Debug("transport = " + result);
-
- return result;
- }*/
-
- public void Disconnect()
- {
- _transport.Close();
- }
-
- #region IConnection Members
-
- public string ClientID
- {
- get
- {
- CheckNotClosed();
- return _connectionInfo.ClientName;
- }
- set
- {
- CheckNotClosed();
- _connectionInfo.ClientName = value;
- }
- }
-
- public override void Close()
- {
- lock (FailoverMutex)
- {
- // atomically set to closed and check the _previous value was NOT CLOSED
- if (Interlocked.Exchange(ref _closed, CLOSED) == NOT_CLOSED)
- {
- try
- {
- CloseAllSessions(null);
- CloseConnection();
- }
- catch (AMQException e)
- {
- throw new QpidException("Error closing connection: " + e);
- }
- }
- }
- }
-
- private void CloseConnection()
- {
- _stateManager.ChangeState(AMQState.CONNECTION_CLOSING);
-
- AMQFrame frame = ConnectionCloseBody.CreateAMQFrame(
- 0, 200, "Qpid.NET client is closing the connection.", 0, 0);
-
- ProtocolWriter.Write(frame);
-
- _log.Debug("Blocking for connection close ok frame");
-
- Disconnect();
- }
-
- class CreateChannelFailoverSupport : FailoverSupport
- {
- private static readonly ILog _log = LogManager.GetLogger(typeof(CreateChannelFailoverSupport));
-
- private bool _transacted;
- private AcknowledgeMode _acknowledgeMode;
- int _prefetchHigh;
- int _prefetchLow;
- AMQConnection _connection;
-
- public CreateChannelFailoverSupport(AMQConnection connection, bool transacted, AcknowledgeMode acknowledgeMode, int prefetchHigh, int prefetchLow)
- {
- _connection = connection;
- _transacted = transacted;
- _acknowledgeMode = acknowledgeMode;
- _prefetchHigh = prefetchHigh;
- _prefetchLow = prefetchLow;
- }
-
- protected override object operation()
- {
- ushort channelId = _connection.NextChannelId();
-
- if (_log.IsDebugEnabled)
- {
- _log.Debug("Write channel open frame for channel id " + channelId);
- }
-
- // We must create the channel and register it before actually sending the frame to the server to
- // open it, so that there is no window where we could receive data on the channel and not be set
- // up to handle it appropriately.
- AmqChannel channel = new AmqChannel(_connection,
- channelId, _transacted, _acknowledgeMode, _prefetchHigh, _prefetchLow);
- _connection.ProtocolSession.AddSessionByChannel(channelId, channel);
- _connection.RegisterSession(channelId, channel);
-
- bool success = false;
- try
- {
- _connection.CreateChannelOverWire(channelId, _prefetchHigh, _prefetchLow, _transacted);
- success = true;
- }
- catch (AMQException e)
- {
- throw new QpidException("Error creating channel: " + e, e);
- }
- finally
- {
- if (!success) {
- _connection.ProtocolSession.RemoveSessionByChannel(channelId);
- _connection.DeregisterSession(channelId);
- }
- }
-
- if (_connection._started)
- {
- channel.Start();
- }
- return channel;
- }
- }
-
- internal ushort NextChannelId()
- {
- return (ushort) Interlocked.Increment(ref _nextChannelId);
- }
-
- public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode)
- {
- return CreateChannel(transacted, acknowledgeMode, AmqChannel.DEFAULT_PREFETCH_HIGH_MARK);
- }
-
- public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode, int prefetch)
- {
- return CreateChannel(transacted, acknowledgeMode, prefetch, prefetch);
- }
-
- public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode, int prefetchHigh, int prefetchLow)
- {
- CheckNotClosed();
- if (ChannelLimitReached())
- {
- throw new ChannelLimitReachedException(_maximumChannelCount);
- }
- else
- {
- CreateChannelFailoverSupport operation =
- new CreateChannelFailoverSupport(this, transacted, acknowledgeMode, prefetchHigh, prefetchLow);
- return (IChannel)operation.execute(this);
- }
- }
-
- public void CloseSession(AmqChannel channel)
- {
- // FIXME: Don't we need FailoverSupport here (as we have SyncWrite).
- _protocolSession.CloseSession(channel);
-
- AMQFrame frame = ChannelCloseBody.CreateAMQFrame(
- channel.ChannelId, 200, "JMS client closing channel", 0, 0);
-
- _log.Debug("Blocking for channel close frame for channel " + channel.ChannelId);
- _protocolWriter.SyncWrite(frame, typeof(ChannelCloseOkBody));
- _log.Debug("Received channel close frame");
- // When control resumes at this point, a reply will have been received that
- // indicates the broker has closed the channel successfully
- }
-
- public ExceptionListenerDelegate ExceptionListener
- {
- get
- {
- CheckNotClosed();
- return _exceptionListener;
- }
- set
- {
- CheckNotClosed();
- _exceptionListener = value;
- }
- }
-
- /// <summary>
- /// Start the connection, i.e. start flowing messages. Note that this method must be called only from a single thread
- /// and is not thread safe (which is legal according to the JMS specification).
- /// @throws JMSException
- /// </summary>
- public void Start()
- {
- CheckNotClosed();
-
- if (!_started)
- {
- foreach (DictionaryEntry lde in _sessions)
- {
- AmqChannel s = (AmqChannel)lde.Value;
- s.Start();
- }
- _started = true;
- }
- }
-
- public void Stop()
- {
- CheckNotClosed();
-
- if (_started)
- {
- foreach (DictionaryEntry lde in _sessions)
- {
- AmqChannel s = (AmqChannel) lde.Value;
- s.Stop();
- }
- _started = false;
- }
- }
-
- public IConnectionListener ConnectionListener
- {
- get { return _connectionListener; }
- set { _connectionListener = value; }
- }
-
- #endregion
-
- #region IDisposable Members
-
- public void Dispose()
- {
- Close();
- }
-
- #endregion
-
- private bool ChannelLimitReached()
- {
- return _maximumChannelCount != 0 && _sessions.Count == _maximumChannelCount;
- }
-
- /// <summary>
- /// Close all the sessions, either due to normal connection closure or due to an error occurring.
- /// @param cause if not null, the error that is causing this shutdown
- /// </summary>
- private void CloseAllSessions(Exception cause)
- {
- _log.Debug("Closing all session in connection " + this);
- ICollection sessions = new ArrayList(_sessions.Values);
- foreach (AmqChannel channel in sessions)
- {
- _log.Debug("Closing channel " + channel);
- if (cause != null)
- {
- channel.ClosedWithException(cause);
- }
- else
- {
- try
- {
- channel.Close();
- }
- catch (QpidException e)
- {
- _log.Error("Error closing channel: " + e);
- }
- }
- }
- _log.Debug("Done closing all sessions in connection " + this);
- }
-
- public int MaximumChannelCount
- {
- get
- {
- CheckNotClosed();
- return _maximumChannelCount;
- }
- }
-
- internal void SetMaximumChannelCount(ushort maximumChannelCount)
- {
- CheckNotClosed();
- _maximumChannelCount = maximumChannelCount;
- }
-
- public uint MaximumFrameSize
- {
- get
- {
- return _maximumFrameSize;
- }
-
- set
- {
- _maximumFrameSize = value;
- }
- }
-
- public IDictionary Sessions
- {
- get
- {
- return _sessions;
- }
- }
-
- public string Host
- {
- get
- {
- return _failoverPolicy.GetCurrentBrokerInfo().Host;
- }
- }
-
- public int Port
- {
- get
- {
- return _failoverPolicy.GetCurrentBrokerInfo().Port;
- }
- }
-
- public string Username
- {
- get
- {
- return _connectionInfo.Username;
- }
- }
-
- public string Password
- {
- get
- {
- return _connectionInfo.Password;
- }
- }
-
- public string VirtualHost
- {
- get
- {
- return _connectionInfo.VirtualHost;
- }
- }
-
- /// <summary>
- /// Invoked by the AMQProtocolSession when a protocol session exception has occurred.
- /// This method sends the exception to a JMS exception listener, if configured, and
- /// propagates the exception to sessions, which in turn will propagate to consumers.
- /// This allows synchronous consumers to have exceptions thrown to them.
- /// </summary>
- /// <param name="cause">the exception</param>
- public void ExceptionReceived(Exception cause)
- {
- if (_exceptionListener != null)
- {
- // Listener expects one of these...
- QpidException xe;
-
- if (cause is QpidException)
- {
- xe = (QpidException) cause;
- }
- else
- {
- xe = new QpidException("Exception thrown against " + ToString() + ": " + cause, cause);
- }
- // in the case of an IOException, MINA has closed the protocol session so we set _closed to true
- // so that any generic client code that tries to close the connection will not mess up this error
- // handling sequence
- if (cause is IOException)
- {
- Interlocked.Exchange(ref _closed, CLOSED);
- }
-#if __MonoCS__
- _exceptionListener(xe);
-#else
- _exceptionListener.Invoke(xe);
-#endif
- }
- else
- {
- _log.Error("Connection exception: " + cause);
- }
-
- // An undelivered is not fatal to the connections usability.
- if (!(cause is AMQUndeliveredException))
- {
- Interlocked.Exchange(ref _closed, CLOSED);
- CloseAllSessions(cause);
- }
- else
- {
- ;
- }
- }
-
- internal void RegisterSession(int channelId, AmqChannel channel)
- {
- _sessions[channelId] = channel;
- }
-
- internal void DeregisterSession(int channelId)
- {
- _sessions.Remove(channelId);
- }
-
- /**
- * Fire the preFailover event to the registered connection listener (if any)
- *
- * @param redirect true if this is the result of a redirect request rather than a connection error
- * @return true if no listener or listener does not veto change
- */
- public bool FirePreFailover(bool redirect)
- {
- bool proceed = true;
- if (_connectionListener != null)
- {
- proceed = _connectionListener.PreFailover(redirect);
- }
- return proceed;
- }
-
- /**
- * Fire the preResubscribe event to the registered connection listener (if any). If the listener
- * vetoes resubscription then all the sessions are closed.
- *
- * @return true if no listener or listener does not veto resubscription.
- * @throws JMSException
- */
- public bool FirePreResubscribe()
- {
- if (_connectionListener != null)
- {
- bool resubscribe = _connectionListener.PreResubscribe();
- if (!resubscribe)
- {
- MarkAllSessionsClosed();
- }
- return resubscribe;
- }
- else
- {
- return true;
- }
- }
-
- /**
- * Marks all sessions and their children as closed without sending any protocol messages. Useful when
- * you need to mark objects "visible" in userland as closed after failover or other significant event that
- * impacts the connection.
- * <p/>
- * The caller must hold the failover mutex before calling this method.
- */
- private void MarkAllSessionsClosed()
- {
- //LinkedList sessionCopy = new LinkedList(_sessions.values());
- ArrayList sessionCopy = new ArrayList(_sessions.Values);
- foreach (AmqChannel session in sessionCopy)
- {
- session.MarkClosed();
- }
- _sessions.Clear();
- }
-
- /**
- * Fires a failover complete event to the registered connection listener (if any).
- */
- public void FireFailoverComplete()
- {
- if (_connectionListener != null)
- {
- _connectionListener.FailoverComplete();
- }
- }
-
- public bool AttemptReconnection(String host, int port, SslOptions sslConfig)
- {
- IBrokerInfo bd = new AmqBrokerInfo("amqp", host, port, sslConfig);
-
- _failoverPolicy.setBroker(bd);
-
- try
- {
- MakeBrokerConnection(bd);
- return true;
- }
- catch (Exception e)
- {
- _log.Debug("Unable to connect to broker at " + bd, e);
- AttemptReconnection();
- }
- return false;
- }
-
- private void MakeBrokerConnection(IBrokerInfo brokerDetail)
- {
- try
- {
- _stateManager = new AMQStateManager();
- _protocolListener = new AMQProtocolListener(this, _stateManager);
- _protocolListener.AddFrameListener(_stateManager);
-
- /*
- // Currently there is only one transport option - BlockingSocket.
- String assemblyName = "Apache.Qpid.Client.Transport.Socket.Blocking.dll";
- String transportType = "Apache.Qpid.Client.Transport.Socket.Blocking.BlockingSocketTransport";
-
- // Load the transport assembly dynamically.
- _transport = LoadTransportFromAssembly(brokerDetail.getHost(), brokerDetail.getPort(), assemblyName, transportType);
- */
-
- _transport = new BlockingSocketTransport();
-
- // Connect.
- _transport.Connect(brokerDetail, this);
- _protocolWriter = new ProtocolWriter(_transport.ProtocolWriter, _protocolListener);
- _protocolSession = new AMQProtocolSession(_transport.ProtocolWriter, _transport, this);
- _protocolListener.ProtocolSession = _protocolSession;
-
- // Now start the connection "handshake".
- _transport.ProtocolWriter.Write(new ProtocolInitiation());
-
- // Blocks until the connection has been opened.
- _stateManager.AttainState(AMQState.CONNECTION_OPEN);
-
- _failoverPolicy.attainedConnection();
-
- // XXX: Again this should be changed to a suitable notify.
- _connected = true;
- }
- catch (AMQException e)
- {
- _lastAMQException = e;
- throw; // rethrow
- }
- }
-
- public bool AttemptReconnection()
- {
- while (_failoverPolicy.FailoverAllowed())
- {
- try
- {
- MakeBrokerConnection(_failoverPolicy.GetNextBrokerInfo());
- return true;
- }
- catch (Exception e)
- {
- if (!(e is AMQException))
- {
- _log.Debug("Unable to connect to broker at " + _failoverPolicy.GetCurrentBrokerInfo(), e);
- }
- else
- {
- _log.Debug(e.Message + ":Unable to connect to broker at " + _failoverPolicy.GetCurrentBrokerInfo());
- }
- }
- }
-
- // Connection unsuccessful.
- return false;
- }
-
- /**
- * For all channels, and for all consumers in those sessions, resubscribe. This is called during failover handling.
- * The caller must hold the failover mutex before calling this method.
- */
- public void ResubscribeChannels()
- {
- ArrayList channels = new ArrayList(_sessions.Values);
- _log.Debug(String.Format("Resubscribing sessions = {0} sessions.size={1}", channels, channels.Count));
- foreach (AmqChannel channel in channels)
- {
- _protocolSession.AddSessionByChannel(channel.ChannelId, channel);
- ReopenChannel(
- channel.ChannelId,
- channel.DefaultPrefetchHigh,
- channel.DefaultPrefetchLow,
- channel.Transacted
- );
- channel.ReplayOnFailOver();
- }
- }
-
- private void ReopenChannel(ushort channelId, int prefetchHigh, int prefetchLow, bool transacted)
- {
- _log.Debug(string.Format("Reopening channel id={0} prefetchHigh={1} prefetchLow={2} transacted={3}",
- channelId, prefetchHigh, prefetchLow, transacted));
- try
- {
- CreateChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
- }
- catch (AMQException e)
- {
- _protocolSession.RemoveSessionByChannel(channelId);
- DeregisterSession(channelId);
- throw new AMQException("Error reopening channel " + channelId + " after failover: " + e);
- }
- }
-
- void CreateChannelOverWire(ushort channelId, int prefetchHigh, int prefetchLow, bool transacted)
- {
- _protocolWriter.SyncWrite(ChannelOpenBody.CreateAMQFrame(channelId, null), typeof (ChannelOpenOkBody));
-
- // Don't use the BasicQos frame if connecting to OpenAMQ (at it is not support). We
- // know this when we connection using AMQP 0.7
- if (ProtocolInitiation.CURRENT_PROTOCOL_VERSION_MAJOR != 7)
- {
- // Basic.Qos frame appears to not be supported by OpenAMQ 1.0d.
- _protocolWriter.SyncWrite(BasicQosBody.CreateAMQFrame(channelId, 0, (ushort)prefetchHigh, false), typeof (BasicQosOkBody));
- }
-
- if (transacted)
- {
- if (_log.IsDebugEnabled)
- {
- _log.Debug("Issuing TxSelect for " + channelId);
- }
- _protocolWriter.SyncWrite(TxSelectBody.CreateAMQFrame(channelId), typeof(TxSelectOkBody));
- }
- }
-
- public String toURL()
- {
- return _connectionInfo.AsUrl();
- }
-
- class HeartBeatThread
- {
- int _heartbeatMillis;
- IProtocolWriter _protocolWriter;
- bool _run = true;
-
- public HeartBeatThread(IProtocolWriter protocolWriter, int heartbeatMillis)
- {
- _protocolWriter = protocolWriter;
- _heartbeatMillis = heartbeatMillis;
- }
-
- public void Run()
- {
- while (_run)
- {
- Thread.Sleep(_heartbeatMillis);
- if (!_run) break;
- _log.Debug("Sending heartbeat");
- // TODO: Can we optimise this so that heartbeats are only written when we haven't sent anything recently to the broker?
- _protocolWriter.Write(HeartbeatBody.FRAME);
- }
- _log.Debug("Heatbeat thread stopped");
- }
-
- public void Stop()
- {
- _run = false;
- }
- }
-
- public void StartHeartBeatThread(int heartbeatSeconds)
- {
- _log.Debug("Starting new heartbeat thread");
- _heartBeatRunner = new HeartBeatThread(ProtocolWriter, heartbeatSeconds * 1000);
- _heartBeatThread = new Thread(new ThreadStart(_heartBeatRunner.Run));
- _heartBeatThread.Name = "HeartBeat";
- _heartBeatThread.Start();
- }
-
- public void StopHeartBeatThread()
- {
- if (_heartBeatRunner != null)
- {
- _log.Debug("Stopping old heartbeat thread");
- _heartBeatRunner.Stop();
- }
- }
- }
-}