path: root/qpid/dotnet/Qpid.Client/Client
diff options
Diffstat (limited to 'qpid/dotnet/Qpid.Client/Client')
80 files changed, 10779 insertions, 0 deletions
diff --git a/qpid/dotnet/Qpid.Client/Client/AMQAuthenticationException.cs b/qpid/dotnet/Qpid.Client/Client/AMQAuthenticationException.cs
new file mode 100644
index 0000000000..7bb64e3fff
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/AMQAuthenticationException.cs
@@ -0,0 +1,39 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Runtime.Serialization;
+namespace Apache.Qpid.Client
+ [Serializable]
+ public class AMQAuthenticationException : AMQException
+ {
+ public AMQAuthenticationException(int error, String message)
+ : base(error, message)
+ {
+ }
+ protected AMQAuthenticationException(SerializationInfo info, StreamingContext ctxt)
+ : base(info, ctxt)
+ {
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs b/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs
new file mode 100644
index 0000000000..41d4e089b6
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs
@@ -0,0 +1,873 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.IO;
+using System.Reflection;
+using System.Threading;
+using log4net;
+using Apache.Qpid.Client.Failover;
+using Apache.Qpid.Client.Protocol;
+using Apache.Qpid.Client.Qms;
+using Apache.Qpid.Client.State;
+using Apache.Qpid.Client.Transport;
+using Apache.Qpid.Client.Transport.Socket.Blocking;
+using Apache.Qpid.Collections;
+using Apache.Qpid.Framing;
+using Apache.Qpid.Messaging;
+namespace Apache.Qpid.Client
+ public class AMQConnection : Closeable, IConnection
+ {
+ private static readonly ILog _log = LogManager.GetLogger(typeof(AMQConnection));
+ IConnectionInfo _connectionInfo;
+ private int _nextChannelId = 0;
+ // _Connected should be refactored with a suitable wait object.
+ private bool _connected;
+ Thread _heartBeatThread;
+ HeartBeatThread _heartBeatRunner;
+ // The last error code that occured on the connection. Used to return the correct exception to the client
+ private AMQException _lastAMQException = null;
+ /**
+ * This is the "root" mutex that must be held when doing anything that could be impacted by failover.
+ * This must be held by any child objects of this connection such as the session, producers and consumers.
+ */
+ private readonly Object _failoverMutex = new Object();
+ public object FailoverMutex
+ {
+ get { return _failoverMutex; }
+ }
+ /**
+ * Policy dictating how to failover
+ */
+ private FailoverPolicy _failoverPolicy;
+ internal bool IsFailoverAllowed
+ {
+ get { if(!_connected) return false; else return _failoverPolicy.FailoverAllowed(); }
+ }
+ /// <summary>
+ /// A channel is roughly analogous to a session. The server can negotiate the maximum number of channels
+ /// per session and we must prevent the client from opening too many. Zero means unlimited.
+ /// </summary>
+ private ushort _maximumChannelCount;
+ /// <summary>
+ /// The maximum size of frame supported by the server
+ /// </summary>
+ private uint _maximumFrameSize;
+ private AMQStateManager _stateManager;
+ private AMQProtocolSession _protocolSession;
+ public AMQProtocolSession ProtocolSession { get { return _protocolSession; } }
+ /// <summary>
+ /// Maps from session id (Integer) to AmqChannel instance
+ /// </summary>
+ private readonly IDictionary _sessions = new LinkedHashtable();
+ private ExceptionListenerDelegate _exceptionListener;
+ private IConnectionListener _connectionListener;
+ private ITransport _transport;
+ public ITransport Transport { get { return _transport; } }
+ /// <summary>
+ /// Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for
+ /// message publication.
+ /// </summary>
+ private bool _started;
+ private AMQProtocolListener _protocolListener;
+ public AMQProtocolListener ProtocolListener { get { return _protocolListener; } }
+ public IProtocolWriter ProtocolWriter
+ {
+ get { return _transport.ProtocolWriter; }
+ }
+ ProtocolWriter _protocolWriter;
+ public ProtocolWriter ConvenientProtocolWriter
+ {
+ get { return _protocolWriter; }
+ }
+ public AMQConnection(IConnectionInfo connectionInfo)
+ {
+ if (connectionInfo == null)
+ {
+ throw new ArgumentException("ConnectionInfo must be specified");
+ }
+ _log.Debug("ConnectionInfo: " + connectionInfo);
+ _connectionInfo = connectionInfo;
+ _log.Debug("password = " + _connectionInfo.Password);
+ _failoverPolicy = new FailoverPolicy(connectionInfo);
+ // We are not currently connected.
+ _connected = false;
+ Exception lastException = null;
+ do
+ {
+ try
+ {
+ IBrokerInfo brokerInfo = _failoverPolicy.GetNextBrokerInfo();
+ _log.Debug("Connecting to " + brokerInfo);
+ MakeBrokerConnection(brokerInfo);
+ break;
+ }
+ catch (Exception e)
+ {
+ lastException = e;
+ _log.Error("Unable to connect to broker " + _failoverPolicy.GetCurrentBrokerInfo(), e);
+ // XXX: Should perhaps break out of the do/while here if not a SocketException...
+ }
+ } while (!_connected && _failoverPolicy.FailoverAllowed());
+ _log.Debug("Are we connected:" + _connected);
+ if (!_connected)
+ {
+ if ( lastException is AMQException )
+ {
+ throw lastException;
+ }
+ else
+ {
+ throw new AMQConnectionException("Unable to connect", lastException);
+ }
+ }
+ }
+ /*private ITransport LoadTransportFromAssembly(string host, int port, String assemblyName, String transportType)
+ {
+ //Assembly assembly = Assembly.LoadFrom(assemblyName);
+ Assembly assembly = Assembly.Load(assemblyName);
+ foreach (Type type in assembly.GetTypes())
+ {
+ _log.Debug(String.Format("type = {0}", type));
+ }
+ Type transport = assembly.GetType(transportType);
+ if (transport == null)
+ {
+ throw new ArgumentException(
+ String.Format("Type is not found in assembly. Type={0} Assembly={1}", transportType, assemblyName));
+ }
+ _log.Debug("transport = " + transport);
+ _log.Debug("ctors = " + transport.GetConstructors());
+ ConstructorInfo info = transport.GetConstructors()[0];
+ ITransport result = (ITransport)info.Invoke(new object[] { host, port, this });
+ _log.Debug("transport = " + result);
+ return result;
+ }*/
+ public void Disconnect()
+ {
+ _transport.Close();
+ }
+ #region IConnection Members
+ public string ClientID
+ {
+ get
+ {
+ CheckNotClosed();
+ return _connectionInfo.ClientName;
+ }
+ set
+ {
+ CheckNotClosed();
+ _connectionInfo.ClientName = value;
+ }
+ }
+ public override void Close()
+ {
+ lock (FailoverMutex)
+ {
+ // atomically set to closed and check the _previous value was NOT CLOSED
+ if (Interlocked.Exchange(ref _closed, CLOSED) == NOT_CLOSED)
+ {
+ try
+ {
+ CloseAllSessions(null);
+ CloseConnection();
+ }
+ catch (AMQException e)
+ {
+ throw new QpidException("Error closing connection: " + e);
+ }
+ }
+ }
+ }
+ private void CloseConnection()
+ {
+ _stateManager.ChangeState(AMQState.CONNECTION_CLOSING);
+ AMQFrame frame = ConnectionCloseBody.CreateAMQFrame(
+ 0, 200, "Qpid.NET client is closing the connection.", 0, 0);
+ ProtocolWriter.Write(frame);
+ _log.Debug("Blocking for connection close ok frame");
+ Disconnect();
+ }
+ class CreateChannelFailoverSupport : FailoverSupport
+ {
+ private static readonly ILog _log = LogManager.GetLogger(typeof(CreateChannelFailoverSupport));
+ private bool _transacted;
+ private AcknowledgeMode _acknowledgeMode;
+ int _prefetchHigh;
+ int _prefetchLow;
+ AMQConnection _connection;
+ public CreateChannelFailoverSupport(AMQConnection connection, bool transacted, AcknowledgeMode acknowledgeMode, int prefetchHigh, int prefetchLow)
+ {
+ _connection = connection;
+ _transacted = transacted;
+ _acknowledgeMode = acknowledgeMode;
+ _prefetchHigh = prefetchHigh;
+ _prefetchLow = prefetchLow;
+ }
+ protected override object operation()
+ {
+ ushort channelId = _connection.NextChannelId();
+ if (_log.IsDebugEnabled)
+ {
+ _log.Debug("Write channel open frame for channel id " + channelId);
+ }
+ // We must create the channel and register it before actually sending the frame to the server to
+ // open it, so that there is no window where we could receive data on the channel and not be set
+ // up to handle it appropriately.
+ AmqChannel channel = new AmqChannel(_connection,
+ channelId, _transacted, _acknowledgeMode, _prefetchHigh, _prefetchLow);
+ _connection.ProtocolSession.AddSessionByChannel(channelId, channel);
+ _connection.RegisterSession(channelId, channel);
+ bool success = false;
+ try
+ {
+ _connection.CreateChannelOverWire(channelId, _prefetchHigh, _prefetchLow, _transacted);
+ success = true;
+ }
+ catch (AMQException e)
+ {
+ throw new QpidException("Error creating channel: " + e, e);
+ }
+ finally
+ {
+ if (!success) {
+ _connection.ProtocolSession.RemoveSessionByChannel(channelId);
+ _connection.DeregisterSession(channelId);
+ }
+ }
+ if (_connection._started)
+ {
+ channel.Start();
+ }
+ return channel;
+ }
+ }
+ internal ushort NextChannelId()
+ {
+ return (ushort) Interlocked.Increment(ref _nextChannelId);
+ }
+ public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode)
+ {
+ return CreateChannel(transacted, acknowledgeMode, AmqChannel.DEFAULT_PREFETCH_HIGH_MARK);
+ }
+ public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode, int prefetch)
+ {
+ return CreateChannel(transacted, acknowledgeMode, prefetch, prefetch);
+ }
+ public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode, int prefetchHigh, int prefetchLow)
+ {
+ CheckNotClosed();
+ if (ChannelLimitReached())
+ {
+ throw new ChannelLimitReachedException(_maximumChannelCount);
+ }
+ else
+ {
+ CreateChannelFailoverSupport operation =
+ new CreateChannelFailoverSupport(this, transacted, acknowledgeMode, prefetchHigh, prefetchLow);
+ return (IChannel)operation.execute(this);
+ }
+ }
+ public void CloseSession(AmqChannel channel)
+ {
+ // FIXME: Don't we need FailoverSupport here (as we have SyncWrite).
+ _protocolSession.CloseSession(channel);
+ AMQFrame frame = ChannelCloseBody.CreateAMQFrame(
+ channel.ChannelId, 200, "JMS client closing channel", 0, 0);
+ _log.Debug("Blocking for channel close frame for channel " + channel.ChannelId);
+ _protocolWriter.SyncWrite(frame, typeof(ChannelCloseOkBody));
+ _log.Debug("Received channel close frame");
+ // When control resumes at this point, a reply will have been received that
+ // indicates the broker has closed the channel successfully
+ }
+ public ExceptionListenerDelegate ExceptionListener
+ {
+ get
+ {
+ CheckNotClosed();
+ return _exceptionListener;
+ }
+ set
+ {
+ CheckNotClosed();
+ _exceptionListener = value;
+ }
+ }
+ /// <summary>
+ /// Start the connection, i.e. start flowing messages. Note that this method must be called only from a single thread
+ /// and is not thread safe (which is legal according to the JMS specification).
+ /// @throws JMSException
+ /// </summary>
+ public void Start()
+ {
+ CheckNotClosed();
+ if (!_started)
+ {
+ foreach (DictionaryEntry lde in _sessions)
+ {
+ AmqChannel s = (AmqChannel)lde.Value;
+ s.Start();
+ }
+ _started = true;
+ }
+ }
+ public void Stop()
+ {
+ CheckNotClosed();
+ if (_started)
+ {
+ foreach (DictionaryEntry lde in _sessions)
+ {
+ AmqChannel s = (AmqChannel) lde.Value;
+ s.Stop();
+ }
+ _started = false;
+ }
+ }
+ public IConnectionListener ConnectionListener
+ {
+ get { return _connectionListener; }
+ set { _connectionListener = value; }
+ }
+ #endregion
+ #region IDisposable Members
+ public void Dispose()
+ {
+ Close();
+ }
+ #endregion
+ private bool ChannelLimitReached()
+ {
+ return _maximumChannelCount != 0 && _sessions.Count == _maximumChannelCount;
+ }
+ /// <summary>
+ /// Close all the sessions, either due to normal connection closure or due to an error occurring.
+ /// @param cause if not null, the error that is causing this shutdown
+ /// </summary>
+ private void CloseAllSessions(Exception cause)
+ {
+ _log.Debug("Closing all session in connection " + this);
+ ICollection sessions = new ArrayList(_sessions.Values);
+ foreach (AmqChannel channel in sessions)
+ {
+ _log.Debug("Closing channel " + channel);
+ if (cause != null)
+ {
+ channel.ClosedWithException(cause);
+ }
+ else
+ {
+ try
+ {
+ channel.Close();
+ }
+ catch (QpidException e)
+ {
+ _log.Error("Error closing channel: " + e);
+ }
+ }
+ }
+ _log.Debug("Done closing all sessions in connection " + this);
+ }
+ public int MaximumChannelCount
+ {
+ get
+ {
+ CheckNotClosed();
+ return _maximumChannelCount;
+ }
+ }
+ internal void SetMaximumChannelCount(ushort maximumChannelCount)
+ {
+ CheckNotClosed();
+ _maximumChannelCount = maximumChannelCount;
+ }
+ public uint MaximumFrameSize
+ {
+ get
+ {
+ return _maximumFrameSize;
+ }
+ set
+ {
+ _maximumFrameSize = value;
+ }
+ }
+ public IDictionary Sessions
+ {
+ get
+ {
+ return _sessions;
+ }
+ }
+ public string Host
+ {
+ get
+ {
+ return _failoverPolicy.GetCurrentBrokerInfo().Host;
+ }
+ }
+ public int Port
+ {
+ get
+ {
+ return _failoverPolicy.GetCurrentBrokerInfo().Port;
+ }
+ }
+ public string Username
+ {
+ get
+ {
+ return _connectionInfo.Username;
+ }
+ }
+ public string Password
+ {
+ get
+ {
+ return _connectionInfo.Password;
+ }
+ }
+ public string VirtualHost
+ {
+ get
+ {
+ return _connectionInfo.VirtualHost;
+ }
+ }
+ /// <summary>
+ /// Invoked by the AMQProtocolSession when a protocol session exception has occurred.
+ /// This method sends the exception to a JMS exception listener, if configured, and
+ /// propagates the exception to sessions, which in turn will propagate to consumers.
+ /// This allows synchronous consumers to have exceptions thrown to them.
+ /// </summary>
+ /// <param name="cause">the exception</param>
+ public void ExceptionReceived(Exception cause)
+ {
+ if (_exceptionListener != null)
+ {
+ // Listener expects one of these...
+ QpidException xe;
+ if (cause is QpidException)
+ {
+ xe = (QpidException) cause;
+ }
+ else
+ {
+ xe = new QpidException("Exception thrown against " + ToString() + ": " + cause, cause);
+ }
+ // in the case of an IOException, MINA has closed the protocol session so we set _closed to true
+ // so that any generic client code that tries to close the connection will not mess up this error
+ // handling sequence
+ if (cause is IOException)
+ {
+ Interlocked.Exchange(ref _closed, CLOSED);
+ }
+#if __MonoCS__
+ _exceptionListener(xe);
+ _exceptionListener.Invoke(xe);
+ }
+ else
+ {
+ _log.Error("Connection exception: " + cause);
+ }
+ // An undelivered is not fatal to the connections usability.
+ if (!(cause is AMQUndeliveredException))
+ {
+ Interlocked.Exchange(ref _closed, CLOSED);
+ CloseAllSessions(cause);
+ }
+ else
+ {
+ ;
+ }
+ }
+ internal void RegisterSession(int channelId, AmqChannel channel)
+ {
+ _sessions[channelId] = channel;
+ }
+ internal void DeregisterSession(int channelId)
+ {
+ _sessions.Remove(channelId);
+ }
+ /**
+ * Fire the preFailover event to the registered connection listener (if any)
+ *
+ * @param redirect true if this is the result of a redirect request rather than a connection error
+ * @return true if no listener or listener does not veto change
+ */
+ public bool FirePreFailover(bool redirect)
+ {
+ bool proceed = true;
+ if (_connectionListener != null)
+ {
+ proceed = _connectionListener.PreFailover(redirect);
+ }
+ return proceed;
+ }
+ /**
+ * Fire the preResubscribe event to the registered connection listener (if any). If the listener
+ * vetoes resubscription then all the sessions are closed.
+ *
+ * @return true if no listener or listener does not veto resubscription.
+ * @throws JMSException
+ */
+ public bool FirePreResubscribe()
+ {
+ if (_connectionListener != null)
+ {
+ bool resubscribe = _connectionListener.PreResubscribe();
+ if (!resubscribe)
+ {
+ MarkAllSessionsClosed();
+ }
+ return resubscribe;
+ }
+ else
+ {
+ return true;
+ }
+ }
+ /**
+ * Marks all sessions and their children as closed without sending any protocol messages. Useful when
+ * you need to mark objects "visible" in userland as closed after failover or other significant event that
+ * impacts the connection.
+ * <p/>
+ * The caller must hold the failover mutex before calling this method.
+ */
+ private void MarkAllSessionsClosed()
+ {
+ //LinkedList sessionCopy = new LinkedList(_sessions.values());
+ ArrayList sessionCopy = new ArrayList(_sessions.Values);
+ foreach (AmqChannel session in sessionCopy)
+ {
+ session.MarkClosed();
+ }
+ _sessions.Clear();
+ }
+ /**
+ * Fires a failover complete event to the registered connection listener (if any).
+ */
+ public void FireFailoverComplete()
+ {
+ if (_connectionListener != null)
+ {
+ _connectionListener.FailoverComplete();
+ }
+ }
+ public bool AttemptReconnection(String host, int port, SslOptions sslConfig)
+ {
+ IBrokerInfo bd = new AmqBrokerInfo("amqp", host, port, sslConfig);
+ _failoverPolicy.setBroker(bd);
+ try
+ {
+ MakeBrokerConnection(bd);
+ return true;
+ }
+ catch (Exception e)
+ {
+ _log.Debug("Unable to connect to broker at " + bd, e);
+ AttemptReconnection();
+ }
+ return false;
+ }
+ private void MakeBrokerConnection(IBrokerInfo brokerDetail)
+ {
+ try
+ {
+ _stateManager = new AMQStateManager();
+ _protocolListener = new AMQProtocolListener(this, _stateManager);
+ _protocolListener.AddFrameListener(_stateManager);
+ /*
+ // Currently there is only one transport option - BlockingSocket.
+ String assemblyName = "Apache.Qpid.Client.Transport.Socket.Blocking.dll";
+ String transportType = "Apache.Qpid.Client.Transport.Socket.Blocking.BlockingSocketTransport";
+ // Load the transport assembly dynamically.
+ _transport = LoadTransportFromAssembly(brokerDetail.getHost(), brokerDetail.getPort(), assemblyName, transportType);
+ */
+ _transport = new BlockingSocketTransport();
+ // Connect.
+ _transport.Connect(brokerDetail, this);
+ _protocolWriter = new ProtocolWriter(_transport.ProtocolWriter, _protocolListener);
+ _protocolSession = new AMQProtocolSession(_transport.ProtocolWriter, _transport, this);
+ _protocolListener.ProtocolSession = _protocolSession;
+ // Now start the connection "handshake".
+ _transport.ProtocolWriter.Write(new ProtocolInitiation());
+ // Blocks until the connection has been opened.
+ _stateManager.AttainState(AMQState.CONNECTION_OPEN);
+ _failoverPolicy.attainedConnection();
+ // XXX: Again this should be changed to a suitable notify.
+ _connected = true;
+ }
+ catch (AMQException e)
+ {
+ _lastAMQException = e;
+ throw; // rethrow
+ }
+ }
+ public bool AttemptReconnection()
+ {
+ while (_failoverPolicy.FailoverAllowed())
+ {
+ try
+ {
+ MakeBrokerConnection(_failoverPolicy.GetNextBrokerInfo());
+ return true;
+ }
+ catch (Exception e)
+ {
+ if (!(e is AMQException))
+ {
+ _log.Debug("Unable to connect to broker at " + _failoverPolicy.GetCurrentBrokerInfo(), e);
+ }
+ else
+ {
+ _log.Debug(e.Message + ":Unable to connect to broker at " + _failoverPolicy.GetCurrentBrokerInfo());
+ }
+ }
+ }
+ // Connection unsuccessful.
+ return false;
+ }
+ /**
+ * For all channels, and for all consumers in those sessions, resubscribe. This is called during failover handling.
+ * The caller must hold the failover mutex before calling this method.
+ */
+ public void ResubscribeChannels()
+ {
+ ArrayList channels = new ArrayList(_sessions.Values);
+ _log.Debug(String.Format("Resubscribing sessions = {0} sessions.size={1}", channels, channels.Count));
+ foreach (AmqChannel channel in channels)
+ {
+ _protocolSession.AddSessionByChannel(channel.ChannelId, channel);
+ ReopenChannel(
+ channel.ChannelId,
+ channel.DefaultPrefetchHigh,
+ channel.DefaultPrefetchLow,
+ channel.Transacted
+ );
+ channel.ReplayOnFailOver();
+ }
+ }
+ private void ReopenChannel(ushort channelId, int prefetchHigh, int prefetchLow, bool transacted)
+ {
+ _log.Debug(string.Format("Reopening channel id={0} prefetchHigh={1} prefetchLow={2} transacted={3}",
+ channelId, prefetchHigh, prefetchLow, transacted));
+ try
+ {
+ CreateChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
+ }
+ catch (AMQException e)
+ {
+ _protocolSession.RemoveSessionByChannel(channelId);
+ DeregisterSession(channelId);
+ throw new AMQException("Error reopening channel " + channelId + " after failover: " + e);
+ }
+ }
+ void CreateChannelOverWire(ushort channelId, int prefetchHigh, int prefetchLow, bool transacted)
+ {
+ _protocolWriter.SyncWrite(ChannelOpenBody.CreateAMQFrame(channelId, null), typeof (ChannelOpenOkBody));
+ // Don't use the BasicQos frame if connecting to OpenAMQ (at it is not support). We
+ // know this when we connection using AMQP 0.7
+ if (ProtocolInitiation.CURRENT_PROTOCOL_VERSION_MAJOR != 7)
+ {
+ // Basic.Qos frame appears to not be supported by OpenAMQ 1.0d.
+ _protocolWriter.SyncWrite(BasicQosBody.CreateAMQFrame(channelId, 0, (ushort)prefetchHigh, false), typeof (BasicQosOkBody));
+ }
+ if (transacted)
+ {
+ if (_log.IsDebugEnabled)
+ {
+ _log.Debug("Issuing TxSelect for " + channelId);
+ }
+ _protocolWriter.SyncWrite(TxSelectBody.CreateAMQFrame(channelId), typeof(TxSelectOkBody));
+ }
+ }
+ public String toURL()
+ {
+ return _connectionInfo.AsUrl();
+ }
+ class HeartBeatThread
+ {
+ int _heartbeatMillis;
+ IProtocolWriter _protocolWriter;
+ bool _run = true;
+ public HeartBeatThread(IProtocolWriter protocolWriter, int heartbeatMillis)
+ {
+ _protocolWriter = protocolWriter;
+ _heartbeatMillis = heartbeatMillis;
+ }
+ public void Run()
+ {
+ while (_run)
+ {
+ Thread.Sleep(_heartbeatMillis);
+ if (!_run) break;
+ _log.Debug("Sending heartbeat");
+ // TODO: Can we optimise this so that heartbeats are only written when we haven't sent anything recently to the broker?
+ _protocolWriter.Write(HeartbeatBody.FRAME);
+ }
+ _log.Debug("Heatbeat thread stopped");
+ }
+ public void Stop()
+ {
+ _run = false;
+ }
+ }
+ public void StartHeartBeatThread(int heartbeatSeconds)
+ {
+ _log.Debug("Starting new heartbeat thread");
+ _heartBeatRunner = new HeartBeatThread(ProtocolWriter, heartbeatSeconds * 1000);
+ _heartBeatThread = new Thread(new ThreadStart(_heartBeatRunner.Run));
+ _heartBeatThread.Name = "HeartBeat";
+ _heartBeatThread.Start();
+ }
+ public void StopHeartBeatThread()
+ {
+ if (_heartBeatRunner != null)
+ {
+ _log.Debug("Stopping old heartbeat thread");
+ _heartBeatRunner.Stop();
+ }
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/AMQConnectionException.cs b/qpid/dotnet/Qpid.Client/Client/AMQConnectionException.cs
new file mode 100644
index 0000000000..c8a48814bb
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/AMQConnectionException.cs
@@ -0,0 +1,38 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Runtime.Serialization;
+namespace Apache.Qpid.Client
+ [Serializable]
+ public class AMQConnectionException : AMQException
+ {
+ public AMQConnectionException(String message, Exception e) : base(message, e)
+ {
+ }
+ protected AMQConnectionException(SerializationInfo info, StreamingContext ctxt)
+ : base(info, ctxt)
+ {
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/AMQDestination.cs b/qpid/dotnet/Qpid.Client/Client/AMQDestination.cs
new file mode 100644
index 0000000000..07ce3c2354
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/AMQDestination.cs
@@ -0,0 +1,234 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+namespace Apache.Qpid.Client
+ public abstract class AMQDestination
+ {
+ protected readonly string _exchangeName;
+ protected readonly string _exchangeClass;
+ protected readonly string _destinationName;
+ protected readonly bool _isExclusive;
+ protected readonly bool _isAutoDelete;
+ protected bool _isDurable;
+ public bool IsDurable
+ {
+ get { return _isDurable; }
+ }
+ protected string _queueName;
+ protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, bool isExclusive,
+ bool isAutoDelete, String queueName)
+ {
+ // XXX: This is ugly - OnlyRequired because of ReplyToDestination.
+// if (destinationName == null)
+// {
+// throw new ArgumentNullException("destinationName");
+// }
+ // XXX: This is ugly - OnlyRequired because of SendingDestinationAdapter.
+// if (exchangeName == null)
+// {
+// throw new ArgumentNullException("exchangeName");
+// }
+ // XXX: This is ugly - OnlyRequired because of SendingDestinationAdapter.
+// if (exchangeClass == null)
+// {
+// throw new ArgumentNullException("exchangeClass");
+// }
+ _exchangeName = exchangeName;
+ _exchangeClass = exchangeClass;
+ _destinationName = destinationName;
+ _isExclusive = isExclusive;
+ _isAutoDelete = isAutoDelete;
+ _queueName = queueName;
+ }
+ public string Name
+ {
+ get
+ {
+ return _destinationName;
+ }
+ }
+ public abstract string RoutingKey
+ {
+ get;
+ }
+ public abstract string EncodedName
+ {
+ get;
+ }
+ public bool AutoDelete
+ {
+ get
+ {
+ return _isAutoDelete;
+ }
+ }
+ public string QueueName
+ {
+ get
+ {
+ return _queueName;
+ }
+ set
+ {
+ _queueName = value;
+ }
+ }
+ public string ExchangeName
+ {
+ get
+ {
+ return _exchangeName;
+ }
+ }
+ public string ExchangeClass
+ {
+ get
+ {
+ return _exchangeClass;
+ }
+ }
+ public bool IsExclusive
+ {
+ get
+ {
+ return _isExclusive;
+ }
+ }
+ public bool IsAutoDelete
+ {
+ get
+ {
+ return _isAutoDelete;
+ }
+ }
+ public override string ToString()
+ {
+ return "Destination: " + _destinationName + ", " +
+ "Queue Name: " + _queueName + ", Exchange: " + _exchangeName +
+ ", Exchange class: " + _exchangeClass + ", Exclusive: " + _isExclusive +
+ ", AutoDelete: " + _isAutoDelete; // +", Routing Key: " + RoutingKey;
+ }
+ public override bool Equals(object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || GetType() != o.GetType())
+ {
+ return false;
+ }
+ AMQDestination that = (AMQDestination) o;
+ if (!StringsNotEqualNullSafe(_destinationName, that._destinationName))
+ {
+ return false;
+ }
+ if (!StringsNotEqualNullSafe(_exchangeClass, that._exchangeClass))
+ {
+ return false;
+ }
+ if (!StringsNotEqualNullSafe(_exchangeName, that._exchangeName))
+ {
+ return false;
+ }
+ if (!StringsNotEqualNullSafe(_queueName, that._queueName))
+ {
+ return false;
+ }
+ if (_isExclusive != that._isExclusive)
+ {
+ return false;
+ }
+ if (_isAutoDelete != that._isAutoDelete)
+ {
+ return false;
+ }
+ return true;
+ }
+ private bool StringsNotEqualNullSafe(string one, string two)
+ {
+ if ((one == null && two != null) ||
+ (one != null && !one.Equals(two)))
+ {
+ return false;
+ }
+ else
+ {
+ return true;
+ }
+ }
+ public override int GetHashCode()
+ {
+ int result;
+ if (_exchangeName == null)
+ {
+ result = "".GetHashCode();
+ }
+ else
+ {
+ result = _exchangeName.GetHashCode();
+ }
+ if (_exchangeClass != null)
+ {
+ result = 29 * result + _exchangeClass.GetHashCode();
+ }
+ if (_destinationName != null)
+ {
+ result = 29 * result + _destinationName.GetHashCode();
+ }
+ if (_queueName != null)
+ {
+ result = 29 * result + _queueName.GetHashCode();
+ }
+ result = result * (_isExclusive ? 13 : 7);
+ result = result * (_isAutoDelete ? 13 : 7);
+ Console.WriteLine("FIXME HashCode for " + this + " = " + result);
+ return result;
+ }
+ public abstract bool IsNameRequired { get; }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/AMQNoConsumersException.cs b/qpid/dotnet/Qpid.Client/Client/AMQNoConsumersException.cs
new file mode 100644
index 0000000000..5c9dd86c53
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/AMQNoConsumersException.cs
@@ -0,0 +1,45 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Runtime.Serialization;
+using Apache.Qpid.Common;
+using Apache.Qpid.Protocol;
+namespace Apache.Qpid.Client
+ [Serializable]
+ public class AMQNoConsumersException : AMQUndeliveredException
+ {
+ public AMQNoConsumersException(string message)
+ : this(message, null)
+ {
+ }
+ public AMQNoConsumersException(string message, object bounced)
+ : base(AMQConstant.NO_CONSUMERS.Code, message, bounced)
+ {
+ }
+ protected AMQNoConsumersException(SerializationInfo info, StreamingContext ctxt)
+ : base(info, ctxt)
+ {
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/AMQNoRouteException.cs b/qpid/dotnet/Qpid.Client/Client/AMQNoRouteException.cs
new file mode 100644
index 0000000000..5868d78f32
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/AMQNoRouteException.cs
@@ -0,0 +1,46 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Runtime.Serialization;
+using Apache.Qpid.Common;
+using Apache.Qpid.Protocol;
+namespace Apache.Qpid.Client
+ [Serializable]
+ public class AMQNoRouteException : AMQUndeliveredException
+ {
+ public AMQNoRouteException(string message)
+ : this(message, null)
+ {
+ }
+ public AMQNoRouteException(string message, object bounced)
+ : base(AMQConstant.NO_ROUTE.Code, message, bounced)
+ {
+ }
+ protected AMQNoRouteException(SerializationInfo info, StreamingContext ctxt)
+ : base(info, ctxt)
+ {
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs b/qpid/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs
new file mode 100644
index 0000000000..591c5b941f
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs
@@ -0,0 +1,322 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Text;
+using Apache.Qpid.Client.Qms;
+namespace Apache.Qpid.Client
+ public class AmqBrokerInfo : IBrokerInfo
+ {
+ public readonly string URL_FORMAT_EXAMPLE =
+ "<transport>://<hostname>[:<port Default=\""+BrokerInfoConstants.DEFAULT_PORT+"\">][?<option>='<value>'[,<option>='<value>']]";
+ public const long DEFAULT_CONNECT_TIMEOUT = 30000L;
+ private string _host = "localhost";
+ private int _port = 5672;
+ private string _transport = "amqp";
+ private Hashtable _options = new Hashtable();
+ private SslOptions _sslOptions;
+ public AmqBrokerInfo()
+ {
+ }
+ public AmqBrokerInfo(string url)
+ {
+ // URL should be of format tcp://host:port?option='value',option='value'
+ try
+ {
+ Uri connection = new Uri(url);
+ String transport = connection.Scheme;
+ // Handles some defaults to minimise changes to existing broker URLS e.g. localhost
+ if (transport != null)
+ {
+ transport = transport.ToLower();
+ //todo this list of valid transports should be enumerated somewhere
+ if ((!(transport.Equals("vm") || transport.Equals("tcp"))))
+ {
+ if (transport.Equals("localhost"))
+ {
+ connection = new Uri(BrokerInfoConstants.DEFAULT_TRANSPORT + "://" + url);
+ transport = connection.Scheme;
+ }
+ else
+ {
+ if (url[transport.Length] == ':' && url[transport.Length + 1] != '/')
+ {
+ //Then most likely we have a host:port value
+ connection = new Uri(BrokerInfoConstants.DEFAULT_TRANSPORT + "://" + url);
+ transport = connection.Scheme;
+ }
+ else
+ {
+ URLHelper.parseError(0, transport.Length, "Unknown transport", url);
+ }
+ }
+ }
+ }
+ else
+ {
+ //Default the transport
+ connection = new Uri(BrokerInfoConstants.DEFAULT_TRANSPORT + "://" + url);
+ transport = connection.Scheme;
+ }
+ if (transport == null)
+ {
+ URLHelper.parseError(-1, "Unknown transport:'" + transport + "'" +
+ " In broker URL:'" + url + "' Format: " + URL_FORMAT_EXAMPLE, "");
+ }
+ Transport = transport;
+ String host = connection.Host;
+ if (!host.Equals("default")) Host = host;
+ int port = connection.Port;
+ if (port == -1)
+ {
+ // Fix for when there is port data but it is not automatically parseable by getPort().
+ String auth = connection.Authority;
+ if (auth != null && auth.IndexOf(':') != -1)
+ {
+ int start = auth.IndexOf(":") + 1;
+ int end = start;
+ bool looking = true;
+ bool found = false;
+ //Walk the authority looking for a port value.
+ while (looking)
+ {
+ try
+ {
+ end++;
+ int.Parse(auth.Substring(start, end-start+1));
+ if (end >= auth.Length)
+ {
+ looking = false;
+ found = true;
+ }
+ }
+ catch (FormatException)
+ {
+ looking = false;
+ }
+ }
+ if (found)
+ {
+ Port = int.Parse(auth.Substring(start, end-start+1));
+ }
+ else
+ {
+ URLHelper.parseError(connection.ToString().IndexOf(connection.Authority) + end - 1,
+ "Illegal character in port number", connection.ToString());
+ }
+ }
+ else
+ {
+ Port = BrokerInfoConstants.DEFAULT_PORT;
+ }
+ }
+ else
+ {
+ Port = port;
+ }
+ String queryString = connection.Query;
+ if (queryString.Length > 0 && queryString[0] == '?')
+ {
+ queryString = queryString.Substring(1);
+ }
+ URLHelper.parseOptions(_options, queryString);
+ //Fragment is #string (not used)
+ }
+ catch (UriFormatException uris)
+ {
+ throw uris;
+// if (uris is UrlSyntaxException)
+// {
+// throw uris;
+// }
+// URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
+ }
+ }
+ public AmqBrokerInfo(string transport, string host, int port, bool useSSL) : this()
+ {
+ _transport = transport;
+ _host = host;
+ _port = port;
+ if (useSSL)
+ {
+ SetOption(BrokerInfoConstants.OPTIONS_SSL, "true");
+ }
+ }
+ public AmqBrokerInfo(string transport, string host, int port, SslOptions sslConfig)
+ : this()
+ {
+ _transport = transport;
+ _host = host;
+ _port = port;
+ if ( sslConfig != null )
+ {
+ SetOption(BrokerInfoConstants.OPTIONS_SSL, "true");
+ _sslOptions = sslConfig;
+ }
+ }
+ public string Host
+ {
+ get { return _host; }
+ set { _host = value; }
+ }
+ public int Port
+ {
+ get { return _port; }
+ set { _port = value; }
+ }
+ public string Transport
+ {
+ get { return _transport; }
+ set { _transport = value; }
+ }
+ public SslOptions SslOptions
+ {
+ get { return _sslOptions; }
+ }
+ public string GetOption(string key)
+ {
+ return (string)_options[key];
+ }
+ public void SetOption(string key, string value)
+ {
+ _options[key] = value;
+ }
+ public long Timeout
+ {
+ get
+ {
+ if ( _options.ContainsKey(BrokerInfoConstants.OPTIONS_CONNECT_TIMEOUT) )
+ {
+ try
+ {
+ return long.Parse(GetOption(BrokerInfoConstants.OPTIONS_CONNECT_TIMEOUT));
+ } catch ( FormatException )
+ {
+ //Do nothing as we will use the default below.
+ }
+ }
+ return BrokerInfoConstants.DEFAULT_CONNECT_TIMEOUT;
+ }
+ set
+ {
+ SetOption(BrokerInfoConstants.OPTIONS_CONNECT_TIMEOUT, value.ToString());
+ }
+ }
+ public override string ToString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.Append(_transport);
+ sb.Append("://");
+ if (!(StringEqualsIgnoreCase(_transport, "vm")))
+ {
+ sb.Append(_host);
+ }
+ sb.Append(':');
+ sb.Append(_port);
+ sb.Append(URLHelper.printOptions(_options));
+ return sb.ToString();
+ }
+ public override bool Equals(object obj)
+ {
+ if (!(obj is IBrokerInfo))
+ {
+ return false;
+ }
+ IBrokerInfo bd = (IBrokerInfo) obj;
+ return StringEqualsIgnoreCase(_host, bd.Host) &&
+ _port == bd.Port &&
+ StringEqualsIgnoreCase(_transport, bd.Transport) &&
+ UseSSL == bd.UseSSL;
+ }
+ public override int GetHashCode()
+ {
+ return _host.ToLower().GetHashCode() ^ _port.GetHashCode();
+ }
+ // TODO: move to util class.
+ private bool StringEqualsIgnoreCase(string one, string two)
+ {
+ return one.ToLower().Equals(two.ToLower());
+ }
+ public bool UseSSL
+ {
+ get
+ {
+ // To be friendly to users we should be case insensitive.
+ // or simply force users to conform to OPTIONS_SSL
+ // todo make case insensitive by trying ssl Ssl sSl ssL SSl SsL sSL SSL
+ if ( _options.ContainsKey(BrokerInfoConstants.OPTIONS_SSL) )
+ {
+ return StringEqualsIgnoreCase(GetOption(BrokerInfoConstants.OPTIONS_SSL), "true");
+ }
+ return false;
+ }
+ set
+ {
+ SetOption(BrokerInfoConstants.OPTIONS_SSL, value.ToString());
+ }
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs b/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
new file mode 100644
index 0000000000..84c7c06fe1
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
@@ -0,0 +1,1241 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Text.RegularExpressions;
+using System.Threading;
+using log4net;
+using Apache.Qpid.Buffer;
+using Apache.Qpid.Client.Message;
+using Apache.Qpid.Client.Util;
+using Apache.Qpid.Collections;
+using Apache.Qpid.Framing;
+using Apache.Qpid.Messaging;
+using Apache.Qpid.Protocol;
+namespace Apache.Qpid.Client
+ /// <summary>
+ /// <p/><table id="crc"><caption>CRC Card</caption>
+ /// <tr><th> Responsibilities <th> Collaborations
+ /// <tr><td> Declare queues.
+ /// <tr><td> Declare exchanges.
+ /// <tr><td> Bind queues to exchanges.
+ /// <tr><td> Create messages.
+ /// <tr><td> Set up message consumers on the channel.
+ /// <tr><td> Set up message producers on the channel.
+ /// <tr><td> Commit the current transaction.
+ /// <tr><td> Roll-back the current transaction.
+ /// <tr><td> Close the channel.
+ /// </table>
+ /// </summary>
+ public class AmqChannel : Closeable, IChannel
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(AmqChannel));
+ internal const int BASIC_CONTENT_TYPE = 60;
+ public const int DEFAULT_PREFETCH_HIGH_MARK = 5000;
+ public const int DEFAULT_PREFETCH_LOW_MARK = 2500;
+ private static int _nextSessionNumber = 0;
+ private AMQConnection _connection;
+ private int _sessionNumber;
+ private bool _suspended;
+ private object _suspensionLock = new object();
+ // Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature.
+ private int _nextConsumerNumber = 1;
+ private bool _transacted;
+ private AcknowledgeMode _acknowledgeMode;
+ private ushort _channelId;
+ private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK;
+ private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK;
+ private FlowControlQueue _queue;
+ private Dispatcher _dispatcher;
+ private MessageFactoryRegistry _messageFactoryRegistry;
+ /// <summary> Holds all of the producers created by this channel. </summary>
+ private Hashtable _producers = Hashtable.Synchronized(new Hashtable());
+ /// <summary> Holds all of the consumers created by this channel. </summary>
+ private Hashtable _consumers = Hashtable.Synchronized(new Hashtable());
+ private ArrayList _replayFrames = new ArrayList();
+ /// <summary>
+ /// The counter of the _next producer id. This id is generated by the session and used only to allow the
+ /// producer to identify itself to the session when deregistering itself.
+ ///
+ /// Access to this id does not require to be synchronized since according to the JMS specification only one
+ /// thread of control is allowed to create producers for any given session instance.
+ /// </summary>
+ private long _nextProducerId;
+ /// <summary>
+ /// Initializes a new instance of the <see cref="AmqChannel"/> class.
+ /// </summary>
+ /// <param name="con">The connection.</param>
+ /// <param name="channelId">The channel id.</param>
+ /// <param name="transacted">if set to <c>true</c> [transacted].</param>
+ /// <param name="acknowledgeMode">The acknowledge mode.</param>
+ /// <param name="defaultPrefetchHigh">Default prefetch high value</param>
+ /// <param name="defaultPrefetchLow">Default prefetch low value</param>
+ internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode,
+ int defaultPrefetchHigh, int defaultPrefetchLow)
+ : this()
+ {
+ _sessionNumber = Interlocked.Increment(ref _nextSessionNumber);
+ _connection = con;
+ _transacted = transacted;
+ if ( transacted )
+ {
+ _acknowledgeMode = AcknowledgeMode.SessionTransacted;
+ }
+ else
+ {
+ _acknowledgeMode = acknowledgeMode;
+ }
+ _channelId = channelId;
+ _defaultPrefetchHighMark = defaultPrefetchHigh;
+ _defaultPrefetchLowMark = defaultPrefetchLow;
+ if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge )
+ {
+ _queue = new FlowControlQueue(_defaultPrefetchLowMark, _defaultPrefetchHighMark,
+ new ThresholdMethod(OnPrefetchLowMark),
+ new ThresholdMethod(OnPrefetchHighMark));
+ }
+ else
+ {
+ // low and upper are the same
+ _queue = new FlowControlQueue(_defaultPrefetchHighMark, _defaultPrefetchHighMark,
+ null, null);
+ }
+ }
+ private AmqChannel()
+ {
+ _messageFactoryRegistry = MessageFactoryRegistry.NewDefaultRegistry();
+ }
+ /// <summary>
+ /// Acknowledge mode for messages received.
+ /// </summary>
+ public AcknowledgeMode AcknowledgeMode
+ {
+ get
+ {
+ CheckNotClosed();
+ return _acknowledgeMode;
+ }
+ }
+ /// <summary>
+ /// True if the channel should use transactions.
+ /// </summary>
+ public bool Transacted
+ {
+ get
+ {
+ CheckNotClosed();
+ return _transacted;
+ }
+ }
+ /// <summary>
+ /// Prefetch value to be used as the default for
+ /// consumers created on this channel.
+ /// </summary>
+ public int DefaultPrefetch
+ {
+ get { return DefaultPrefetchHigh; }
+ }
+ /// <summary>
+ /// Prefetch low value to be used as the default for
+ /// consumers created on this channel.
+ /// </summary>
+ public int DefaultPrefetchLow
+ {
+ get { return _defaultPrefetchLowMark; }
+ }
+ /// <summary>
+ /// Prefetch high value to be used as the default for
+ /// consumers created on this channel.
+ /// </summary>
+ public int DefaultPrefetchHigh
+ {
+ get { return _defaultPrefetchHighMark; }
+ }
+ /// <summary> Indicates whether or not this channel is currently suspended. </summary>
+ public bool IsSuspended
+ {
+ get { return _suspended; }
+ }
+ /// <summary> Provides the channels number within the the connection. </summary>
+ public ushort ChannelId
+ {
+ get { return _channelId; }
+ }
+ /// <summary> Provides the connection that this channel runs over. </summary>
+ public AMQConnection Connection
+ {
+ get { return _connection; }
+ }
+ /// <summary>
+ /// Declare a new exchange.
+ /// </summary>
+ /// <param name="exchangeName">Name of the exchange</param>
+ /// <param name="exchangeClass">Class of the exchange, from <see cref="ExchangeClassConstants"/></param>
+ public void DeclareExchange(String exchangeName, String exchangeClass)
+ {
+ _logger.Debug(string.Format("DeclareExchange vame={0} exchangeClass={1}", exchangeName, exchangeClass));
+ DeclareExchange(_channelId, 0, exchangeName, exchangeClass, false, false, false, false, true, null);
+ }
+ /// <summary>
+ /// Declare a new exchange using the default exchange class.
+ /// </summary>
+ /// <param name="exchangeName">Name of the exchange</param>
+ public void DeleteExchange(string exchangeName)
+ {
+ throw new NotImplementedException();
+ }
+ /// <summary>
+ /// Declare a new queue with the specified set of arguments.
+ /// </summary>
+ /// <param name="queueName">Name of the queue</param>
+ /// <param name="isDurable">True if the queue should be durable</param>
+ /// <param name="isExclusive">True if the queue should be exclusive to this channel</param>
+ /// <param name="isAutoDelete">True if the queue should be deleted when the channel closes</param>
+ public void DeclareQueue(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete)
+ {
+ DoQueueDeclare(queueName, isDurable, isExclusive, isAutoDelete, null);
+ }
+ /// <summary>
+ /// Declare a new queue with the specified set of arguments.
+ /// </summary>
+ /// <param name="queueName">Name of the queue</param>
+ /// <param name="isDurable">True if the queue should be durable</param>
+ /// <param name="isExclusive">True if the queue should be exclusive to this channel</param>
+ /// <param name="isAutoDelete">True if the queue should be deleted when the channel closes</param>
+ /// <param name="args">Optional arguments to Queue.Declare</param>
+ public void DeclareQueue(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete, IFieldTable args)
+ {
+ DoQueueDeclare(queueName, isDurable, isExclusive, isAutoDelete, args);
+ }
+ /// <summary>
+ /// Delete a queue with the specifies arguments.
+ /// </summary>
+ /// <param name="queueName">Name of the queue to delete</param>
+ /// <param name="ifUnused">If true, the queue will not deleted if it has no consumers</param>
+ /// <param name="ifEmpty">If true, the queue will not deleted if it has no messages</param>
+ /// <param name="noWait">If true, the server will not respond to the method</param>
+ public void DeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait)
+ {
+ DoDeleteQueue(queueName, ifUnused, ifEmpty, noWait);
+ }
+ /// <summary>
+ /// Generate a new Unique name to use for a queue.
+ /// </summary>
+ /// <returns>A unique name to this channel</returns>
+ public string GenerateUniqueName()
+ {
+ string result = _connection.ProtocolSession.GenerateQueueName();
+ return Regex.Replace(result, "[^a-z0-9_]", "_");
+ }
+ /// <summary>
+ /// Removes all messages from a queue.
+ /// </summary>
+ /// <param name="queueName">Name of the queue to delete</param>
+ /// <param name="noWait">If true, the server will not respond to the method</param>
+ public void PurgeQueue(string queueName, bool noWait)
+ {
+ DoPurgeQueue(queueName, noWait);
+ }
+ /// <summary>
+ /// Bind a queue to the specified exchange.
+ /// </summary>
+ /// <param name="queueName">Name of queue to bind</param>
+ /// <param name="exchangeName">Name of exchange to bind to</param>
+ /// <param name="routingKey">Routing key</param>
+ public void Bind(string queueName, string exchangeName, string routingKey)
+ {
+ DoBind(queueName, exchangeName, routingKey, new FieldTable());
+ }
+ /// <summary>
+ /// Bind a queue to the specified exchange.
+ /// </summary>
+ /// <param name="queueName">Name of queue to bind</param>
+ /// <param name="exchangeName">Name of exchange to bind to</param>
+ /// <param name="routingKey">Routing key</param>
+ /// <param name="args">Table of arguments for the binding. Used to bind with a Headers Exchange</param>
+ public void Bind(string queueName, string exchangeName, string routingKey, IFieldTable args)
+ {
+ DoBind(queueName, exchangeName, routingKey, (FieldTable)args);
+ }
+ /// <summary>
+ /// Create a new empty message with no body.
+ /// </summary>
+ /// <returns>The new message</returns>
+ public IMessage CreateMessage()
+ {
+ return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream");
+ }
+ /// <summary>
+ /// Create a new message of the specified MIME type.
+ /// </summary>
+ /// <param name="mimeType">The mime type to create</param>
+ /// <returns>The new message</returns>
+ public IMessage CreateMessage(string mimeType)
+ {
+ return _messageFactoryRegistry.CreateMessage(mimeType);
+ }
+ /// <summary>
+ /// Creates a new message for bytes (application/octet-stream).
+ /// </summary>
+ /// <returns>The new message</returns>
+ public IBytesMessage CreateBytesMessage()
+ {
+ return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream");
+ }
+ /// <summary>
+ /// Creates a new text message (text/plain) with empty content.
+ /// </summary>
+ /// <returns>The new message</returns>
+ public ITextMessage CreateTextMessage()
+ {
+ return CreateTextMessage(String.Empty);
+ }
+ /// <summary>
+ /// Creates a new text message (text/plain) with a body.
+ /// </summary>
+ /// <param name="text">Initial body of the message</param>
+ /// <returns>The new message</returns>
+ public ITextMessage CreateTextMessage(string text)
+ {
+ ITextMessage msg = (ITextMessage)_messageFactoryRegistry.CreateMessage("text/plain");
+ msg.Text = text;
+ return msg;
+ }
+ /// <summary>
+ /// Creates a new Consumer using the builder pattern.
+ /// </summary>
+ /// <param name="queueName">Name of queue to receive messages from</param>
+ /// <returns>The builder object</returns>
+ public MessageConsumerBuilder CreateConsumerBuilder(string queueName)
+ {
+ return new MessageConsumerBuilder(this, queueName);
+ }
+ /// <summary>
+ /// Creates a new consumer.
+ /// </summary>
+ /// <param name="queueName">Name of queue to receive messages from</param>
+ /// <param name="prefetchLow">Low prefetch value</param>
+ /// <param name="prefetchHigh">High prefetch value</param>
+ /// <param name="noLocal">If true, messages sent on this channel will not be received by this consumer</param>
+ /// <param name="exclusive">If true, the consumer opens the queue in exclusive mode</param>
+ /// <returns>The new consumer</returns>
+ public IMessageConsumer CreateConsumer(string queueName,
+ int prefetchLow,
+ int prefetchHigh,
+ bool noLocal,
+ bool exclusive)
+ {
+ _logger.Debug(String.Format("CreateConsumer queueName={0} prefetchLow={1} prefetchHigh={2} noLocal={3} exclusive={4} ",
+ queueName, prefetchLow, prefetchHigh, noLocal, exclusive));
+ return CreateConsumerImpl(queueName, prefetchLow, prefetchHigh, noLocal, exclusive, false);
+ }
+ /// <summary>
+ /// Creates a new consumer.
+ /// </summary>
+ /// <param name="queueName">Name of queue to receive messages from</param>
+ /// <param name="prefetchLow">Low prefetch value</param>
+ /// <param name="prefetchHigh">High prefetch value</param>
+ /// <param name="noLocal">If true, messages sent on this channel will not be received by this consumer</param>
+ /// <param name="exclusive">If true, the consumer opens the queue in exclusive mode</param>
+ /// <param name="browse">If true, the consumer only browses and does not consume messages</param>
+ /// <returns>The new consumer</returns>
+ public IMessageConsumer CreateConsumer(string queueName,
+ int prefetchLow,
+ int prefetchHigh,
+ bool noLocal,
+ bool exclusive,
+ bool browse)
+ {
+ _logger.Debug(String.Format("CreateConsumer queueName={0} prefetchLow={1} prefetchHigh={2} noLocal={3} exclusive={4} browse={5}",
+ queueName, prefetchLow, prefetchHigh, noLocal, exclusive, browse));
+ return CreateConsumerImpl(queueName, prefetchLow, prefetchHigh, noLocal, exclusive, browse);
+ }
+ /// <summary>
+ /// Unsubscribe from a queue.
+ /// </summary>
+ /// <param name="subscriptionName">Subscription name</param>
+ public void Unsubscribe(String name)
+ {
+ throw new NotImplementedException();
+ }
+ /// <summary>
+ /// Create a new message publisher using the builder pattern.
+ /// </summary>
+ /// <returns>The builder object</returns>
+ public MessagePublisherBuilder CreatePublisherBuilder()
+ {
+ return new MessagePublisherBuilder(this);
+ }
+ /// <summary>
+ /// Create a new message publisher.
+ /// </summary>
+ /// <param name="exchangeName">Name of exchange to publish to</param>
+ /// <param name="routingKey">Routing key</param>
+ /// <param name="deliveryMode">Default delivery mode</param>
+ /// <param name="timeToLive">Default TTL time of messages</param>
+ /// <param name="immediate">If true, sent immediately</param>
+ /// <param name="mandatory">If true, the broker will return an error
+ /// (as a connection exception) if the message cannot be delivered</param>
+ /// <param name="priority">Default message priority</param>
+ /// <returns>The new message publisher</returns>
+ public IMessagePublisher CreatePublisher(string exchangeName, string routingKey, DeliveryMode deliveryMode,
+ long timeToLive, bool immediate, bool mandatory, int priority)
+ {
+ _logger.Debug(string.Format("Using new CreatePublisher exchangeName={0}, exchangeClass={1} routingKey={2}",
+ exchangeName, "none", routingKey));
+ return CreateProducerImpl(exchangeName, routingKey, deliveryMode,
+ timeToLive, immediate, mandatory, priority);
+ }
+ /// <summary>
+ /// Recover after transaction failure.
+ /// </summary>
+ /// <remarks>The 0-8 protocol does not support this, not implemented exception will always be thrown.</remarks>
+ public void Recover()
+ {
+ CheckNotClosed();
+ CheckNotTransacted();
+ throw new NotImplementedException();
+ }
+ /// <summary>
+ /// Commit the transaction.
+ /// </summary>
+ public void Commit()
+ {
+ // FIXME: Fail over safety. Needs FailoverSupport?
+ CheckNotClosed();
+ CheckTransacted(); // throws IllegalOperationException if not a transacted session
+ try
+ {
+ // Acknowledge up to message last delivered (if any) for each consumer.
+ // Need to send ack for messages delivered to consumers so far.
+ foreach (BasicMessageConsumer consumer in _consumers.Values)
+ {
+ // Sends acknowledgement to server.
+ consumer.AcknowledgeDelivered();
+ }
+ // Commits outstanding messages sent and outstanding acknowledgements.
+ _connection.ConvenientProtocolWriter.SyncWrite(TxCommitBody.CreateAMQFrame(_channelId), typeof(TxCommitOkBody));
+ }
+ catch (AMQException e)
+ {
+ throw new QpidException("Failed to commit", e);
+ }
+ }
+ /// <summary>
+ /// Rollback the transaction.
+ /// </summary>
+ public void Rollback()
+ {
+ lock (_suspensionLock)
+ {
+ CheckTransacted(); // throws IllegalOperationException if not a transacted session
+ try
+ {
+ bool suspended = IsSuspended;
+ if (!suspended)
+ {
+ Suspend(true);
+ }
+ // Reject up to message last delivered (if any) for each consumer.
+ // Need to send reject for messages delivered to consumers so far.
+ foreach (BasicMessageConsumer consumer in _consumers.Values)
+ {
+ // Sends acknowledgement to server.
+ consumer.RejectUnacked();
+ }
+ _connection.ConvenientProtocolWriter.SyncWrite(TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody));
+ if ( !suspended )
+ {
+ Suspend(false);
+ }
+ }
+ catch (AMQException e)
+ {
+ throw new QpidException("Failed to rollback", e);
+ }
+ }
+ }
+ /// <summary>
+ /// Create a disconnected channel that will fault
+ /// for most things, but is useful for testing
+ /// </summary>
+ /// <returns>A new disconnected channel</returns>
+ public static IChannel CreateDisconnectedChannel()
+ {
+ return new AmqChannel();
+ }
+ public override void Close()
+ {
+ lock (_connection.FailoverMutex)
+ {
+ // We must close down all producers and consumers in an orderly fashion. This is the only method
+ // that can be called from a different thread of control from the one controlling the session
+ lock (_closingLock)
+ {
+ SetClosed();
+ // we pass null since this is not an error case
+ CloseProducersAndConsumers(null);
+ try
+ {
+ _connection.CloseSession(this);
+ }
+ catch (AMQException e)
+ {
+ throw new QpidException("Error closing session: " + e);
+ }
+ finally
+ {
+ _connection.DeregisterSession(_channelId);
+ }
+ }
+ }
+ }
+ /**
+ * Called when the server initiates the closure of the session
+ * unilaterally.
+ * @param e the exception that caused this session to be closed. Null causes the
+ */
+ public void ClosedWithException(Exception e)
+ {
+ lock (_connection.FailoverMutex)
+ {
+ // An AMQException has an error code and message already and will be passed in when closure occurs as a
+ // result of a channel close request
+ SetClosed();
+ AMQException amqe;
+ if (e is AMQException)
+ {
+ amqe = (AMQException) e;
+ }
+ else
+ {
+ amqe = new AMQException("Closing session forcibly", e);
+ }
+ _connection.DeregisterSession(_channelId);
+ CloseProducersAndConsumers(amqe);
+ }
+ }
+ public void MessageReceived(UnprocessedMessage message)
+ {
+ if (_logger.IsDebugEnabled)
+ {
+ _logger.Debug("Message received in session with channel id " + _channelId);
+ }
+ if ( message.DeliverBody == null )
+ {
+ ReturnBouncedMessage(message);
+ }
+ else
+ {
+ _queue.Enqueue(message);
+ }
+ }
+ public void Dispose()
+ {
+ Close();
+ }
+ private void SetClosed()
+ {
+ Interlocked.Exchange(ref _closed, CLOSED);
+ }
+ /// <summary>
+ /// Close all producers or consumers. This is called either in the error case or when closing the session normally.
+ /// <param name="amqe">the exception, may be null to indicate no error has occurred</param>
+ ///
+ private void CloseProducersAndConsumers(AMQException amqe)
+ {
+ try
+ {
+ CloseProducers();
+ }
+ catch (QpidException e)
+ {
+ _logger.Error("Error closing session: " + e, e);
+ }
+ try
+ {
+ CloseConsumers(amqe);
+ }
+ catch (QpidException e)
+ {
+ _logger.Error("Error closing session: " + e, e);
+ }
+ }
+ /// <summary>
+ /// Called to close message producers cleanly. This may or may <b>not</b> be as a result of an error. There is
+ /// currently no way of propagating errors to message producers (this is a JMS limitation).
+ /// </summary>
+ private void CloseProducers()
+ {
+ _logger.Debug("Closing producers on session " + this);
+ // we need to clone the list of producers since the close() method updates the _producers collection
+ // which would result in a concurrent modification exception
+ ArrayList clonedProducers = new ArrayList(_producers.Values);
+ foreach (BasicMessageProducer prod in clonedProducers)
+ {
+ _logger.Debug("Closing producer " + prod);
+ prod.Close();
+ }
+ // at this point the _producers map is empty
+ }
+ /// <summary>
+ /// Called to close message consumers cleanly. This may or may <b>not</b> be as a result of an error.
+ /// <param name="error">not null if this is a result of an error occurring at the connection level</param>
+ private void CloseConsumers(Exception error)
+ {
+ if (_dispatcher != null)
+ {
+ _dispatcher.StopDispatcher();
+ }
+ // we need to clone the list of consumers since the close() method updates the _consumers collection
+ // which would result in a concurrent modification exception
+ ArrayList clonedConsumers = new ArrayList(_consumers.Values);
+ foreach (BasicMessageConsumer con in clonedConsumers)
+ {
+ if (error != null)
+ {
+ con.NotifyError(error);
+ }
+ else
+ {
+ con.Close();
+ }
+ }
+ // at this point the _consumers map will be empty
+ }
+ private IMessagePublisher CreateProducerImpl(string exchangeName, string routingKey,
+ DeliveryMode deliveryMode,
+ long timeToLive, bool immediate, bool mandatory, int priority)
+ {
+ lock (_closingLock)
+ {
+ CheckNotClosed();
+ try
+ {
+ return new BasicMessageProducer(exchangeName, routingKey, _transacted, _channelId,
+ this, GetNextProducerId(),
+ deliveryMode, timeToLive, immediate, mandatory, priority);
+ }
+ catch (AMQException e)
+ {
+ _logger.Error("Error creating message producer: " + e, e);
+ throw new QpidException("Error creating message producer", e);
+ }
+ }
+ }
+ /// <summary> Creates a message consumer on this channel.</summary>
+ ///
+ /// <param name="queueName">The name of the queue to attach the consumer to.</param>
+ /// <param name="prefetchLow">The pre-fetch buffer low-water mark.</param>
+ /// <param name="prefetchHigh">The pre-fetch buffer high-water mark.</param>
+ /// <param name="noLocal">The no-local flag, <tt>true</tt> means that the consumer does not receive messages sent on this channel.</param>
+ /// <param name="exclusive">The exclusive flag, <tt>true</tt> gives this consumer exclusive receive access to the queue.</param>
+ ///
+ /// <return>The message consumer.</return>
+ private IMessageConsumer CreateConsumerImpl(string queueName,
+ int prefetchLow,
+ int prefetchHigh,
+ bool noLocal,
+ bool exclusive,
+ bool browse)
+ {
+ lock (_closingLock)
+ {
+ CheckNotClosed();
+ BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, queueName, noLocal,
+ _messageFactoryRegistry, this,
+ prefetchHigh, prefetchLow, exclusive,
+ browse);
+ try
+ {
+ RegisterConsumer(consumer);
+ }
+ catch (AMQException e)
+ {
+ throw new QpidException("Error registering consumer: " + e, e);
+ }
+ return consumer;
+ }
+ }
+ private void CheckTransacted()
+ {
+ if (!Transacted)
+ {
+ throw new InvalidOperationException("Channel is not transacted");
+ }
+ }
+ private void CheckNotTransacted()
+ {
+ if (Transacted)
+ {
+ throw new InvalidOperationException("Channel is transacted");
+ }
+ }
+ internal void Start()
+ {
+ _dispatcher = new Dispatcher(this);
+ Thread dispatcherThread = new Thread(new ThreadStart(_dispatcher.RunDispatcher));
+ dispatcherThread.IsBackground = true;
+ dispatcherThread.Start();
+ }
+ internal void Stop()
+ {
+ Suspend(true);
+ if (_dispatcher != null)
+ {
+ _dispatcher.StopDispatcher();
+ }
+ }
+ internal void RegisterConsumer(string consumerTag, IMessageConsumer consumer)
+ {
+ _consumers[consumerTag] = consumer;
+ }
+ /// <summary>
+ /// Called by the MessageConsumer when closing, to deregister the consumer from the
+ /// map from consumerTag to consumer instance.
+ /// </summary>
+ /// <param name="consumerTag">the consumer tag, that was broker-generated</param>
+ internal void DeregisterConsumer(string consumerTag)
+ {
+ _consumers.Remove(consumerTag);
+ }
+ internal void RegisterProducer(long producerId, IMessagePublisher publisher)
+ {
+ _producers[producerId] = publisher;
+ }
+ internal void DeregisterProducer(long producerId)
+ {
+ _producers.Remove(producerId);
+ }
+ private long GetNextProducerId()
+ {
+ return ++_nextProducerId;
+ }
+ /**
+ * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after
+ * failover when the client has veoted resubscription.
+ *
+ * The caller of this method must already hold the failover mutex.
+ */
+ internal void MarkClosed()
+ {
+ SetClosed();
+ _connection.DeregisterSession(_channelId);
+ MarkClosedProducersAndConsumers();
+ }
+ private void MarkClosedProducersAndConsumers()
+ {
+ try
+ {
+ // no need for a markClosed* method in this case since there is no protocol traffic closing a producer
+ CloseProducers();
+ }
+ catch (QpidException e)
+ {
+ _logger.Error("Error closing session: " + e, e);
+ }
+ try
+ {
+ MarkClosedConsumers();
+ }
+ catch (QpidException e)
+ {
+ _logger.Error("Error closing session: " + e, e);
+ }
+ }
+ private void MarkClosedConsumers()
+ {
+ if (_dispatcher != null)
+ {
+ _dispatcher.StopDispatcher();
+ }
+ // we need to clone the list of consumers since the close() method updates the _consumers collection
+ // which would result in a concurrent modification exception
+ ArrayList clonedConsumers = new ArrayList(_consumers.Values);
+ foreach (BasicMessageConsumer consumer in clonedConsumers)
+ {
+ consumer.MarkClosed();
+ }
+ // at this point the _consumers map will be empty
+ }
+ private void DoPurgeQueue(string queueName, bool noWait)
+ {
+ try
+ {
+ _logger.DebugFormat("PurgeQueue {0}", queueName);
+ AMQFrame purgeQueue = QueuePurgeBody.CreateAMQFrame(_channelId, 0, queueName, noWait);
+ if (noWait)
+ _connection.ProtocolWriter.Write(purgeQueue);
+ else
+ _connection.ConvenientProtocolWriter.SyncWrite(purgeQueue, typeof(QueuePurgeOkBody));
+ }
+ catch (AMQException)
+ {
+ throw;
+ }
+ }
+ /**
+ * Replays frame on fail over.
+ *
+ * @throws AMQException
+ */
+ internal void ReplayOnFailOver()
+ {
+ _logger.Debug(string.Format("Replaying frames for channel {0}", _channelId));
+ foreach (AMQFrame frame in _replayFrames)
+ {
+ _logger.Debug(string.Format("Replaying frame=[{0}]", frame));
+ _connection.ProtocolWriter.Write(frame);
+ }
+ }
+ /// <summary>
+ /// Callers must hold the failover mutex before calling this method.
+ /// </summary>
+ /// <param name="consumer"></param>
+ private void RegisterConsumer(BasicMessageConsumer consumer)
+ {
+ // Need to generate a consumer tag on the client so we can exploit the nowait flag.
+ String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++);
+ consumer.ConsumerTag = tag;
+ _consumers.Add(tag, consumer);
+ String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.NoLocal,
+ consumer.Exclusive, consumer.AcknowledgeMode, tag, consumer.Browse);
+ }
+ internal void DoBind(string queueName, string exchangeName, string routingKey, FieldTable args)
+ {
+ _logger.Debug(string.Format("QueueBind queueName={0} exchangeName={1} routingKey={2}, arg={3}",
+ queueName, exchangeName, routingKey, args));
+ AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0,
+ queueName, exchangeName,
+ routingKey, false, args);
+ lock (_connection.FailoverMutex)
+ {
+ _connection.ConvenientProtocolWriter.SyncWrite(queueBind, typeof(QueueBindOkBody));
+ }
+ // AS FIXME: wasnae me
+ _replayFrames.Add(QueueBindBody.CreateAMQFrame(_channelId, 0,
+ queueName, exchangeName,
+ routingKey, true, args));
+ }
+ private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode, String tag, bool browse)
+ {
+ FieldTable args = new FieldTable();
+ if(browse)
+ {
+ args["x-filter-no-consume"] = true;
+ }
+ AMQFrame basicConsume = BasicConsumeBody.CreateAMQFrame(_channelId, 0,
+ queueName, tag, noLocal,
+ acknowledgeMode == AcknowledgeMode.NoAcknowledge,
+ exclusive, true, args);
+ _replayFrames.Add(basicConsume);
+ _connection.ProtocolWriter.Write(basicConsume);
+ return tag;
+ }
+ private void DoDeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait)
+ {
+ try
+ {
+ _logger.Debug(string.Format("DeleteQueue name={0}", queueName));
+ AMQFrame queueDelete = QueueDeleteBody.CreateAMQFrame(_channelId, 0, queueName, ifUnused, ifEmpty, noWait);
+ if (noWait)
+ {
+ _connection.ProtocolWriter.Write(queueDelete);
+ }
+ else
+ {
+ _connection.ConvenientProtocolWriter.SyncWrite(queueDelete, typeof(QueueDeleteOkBody));
+ }
+ // AS FIXME: wasnae me
+ _replayFrames.Add(QueueDeleteBody.CreateAMQFrame(_channelId, 0, queueName, ifUnused, ifEmpty, true));
+ }
+ catch (AMQException)
+ {
+ throw;
+ }
+ }
+ private void DoQueueDeclare(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete, IFieldTable args)
+ {
+ _logger.Debug(string.Format("DeclareQueue name={0} durable={1} exclusive={2}, auto-delete={3}",
+ queueName, isDurable, isExclusive, isAutoDelete));
+ AMQFrame queueDeclare = QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive,
+ isAutoDelete, false, (FieldTable) args);
+ lock (_connection.FailoverMutex)
+ {
+ _connection.ConvenientProtocolWriter.SyncWrite(queueDeclare, typeof(QueueDeclareOkBody));
+ }
+ // AS FIXME: wasnae me
+ _replayFrames.Add(QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive,
+ isAutoDelete, true, null));
+ }
+ // AMQP-level method.
+ private void DeclareExchange(ushort channelId, ushort ticket, string exchangeName,
+ string exchangeClass, bool passive, bool durable,
+ bool autoDelete, bool xinternal, bool noWait, FieldTable args)
+ {
+ _logger.Debug(String.Format("DeclareExchange channelId={0} exchangeName={1} exchangeClass={2}",
+ _channelId, exchangeName, exchangeClass));
+ AMQFrame declareExchange = ExchangeDeclareBody.CreateAMQFrame(channelId, ticket, exchangeName, exchangeClass, passive,
+ durable, autoDelete, xinternal, noWait, args);
+ if (noWait)
+ {
+ lock (_connection.FailoverMutex)
+ {
+ _connection.ProtocolWriter.Write(declareExchange);
+ }
+ // AS FIXME: wasnae me
+ _replayFrames.Add(declareExchange);
+ }
+ else
+ {
+ throw new NotImplementedException("Don't use nowait=false with DeclareExchange");
+ //_connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof (ExchangeDeclareOkBody));
+ }
+ }
+ /**
+ * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from
+ * a BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is
+ * AUTO_ACK or similar.
+ *
+ * @param deliveryTag the tag of the last message to be acknowledged
+ * @param multiple if true will acknowledge all messages up to and including the one specified by the
+ * delivery tag
+ */
+ internal void AcknowledgeMessage(ulong deliveryTag, bool multiple)
+ {
+ AMQFrame ackFrame = BasicAckBody.CreateAMQFrame(_channelId, deliveryTag, multiple);
+ if (_logger.IsDebugEnabled)
+ {
+ _logger.Debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
+ }
+ // FIXME: lock FailoverMutex here?
+ _connection.ProtocolWriter.Write(ackFrame);
+ }
+ public void RejectMessage(ulong deliveryTag, bool requeue)
+ {
+ if ((_acknowledgeMode == AcknowledgeMode.ClientAcknowledge) || (_acknowledgeMode == AcknowledgeMode.SessionTransacted))
+ {
+ AMQFrame rejectFrame = BasicRejectBody.CreateAMQFrame(_channelId, deliveryTag, requeue);
+ _connection.ProtocolWriter.Write(rejectFrame);
+ }
+ }
+ /// <summary>
+ /// Handle a message that bounced from the server, creating
+ /// the corresponding exception and notifying the connection about it
+ /// </summary>
+ /// <param name="message">Unprocessed message</param>
+ private void ReturnBouncedMessage(UnprocessedMessage message)
+ {
+ try
+ {
+ AbstractQmsMessage bouncedMessage =
+ _messageFactoryRegistry.CreateMessage(0, false, message.ContentHeader, message.Bodies);
+ int errorCode = message.BounceBody.ReplyCode;
+ string reason = message.BounceBody.ReplyText;
+ _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")");
+ AMQException exception;
+ if (errorCode == AMQConstant.NO_CONSUMERS.Code)
+ {
+ exception = new AMQNoConsumersException(reason, bouncedMessage);
+ }
+ else if (errorCode == AMQConstant.NO_ROUTE.Code)
+ {
+ exception = new AMQNoRouteException(reason, bouncedMessage);
+ }
+ else
+ {
+ exception = new AMQUndeliveredException(errorCode, reason, bouncedMessage);
+ }
+ _connection.ExceptionReceived(exception);
+ }
+ catch (Exception ex)
+ {
+ _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", ex);
+ }
+ }
+ private void OnPrefetchLowMark(int count)
+ {
+ if (_acknowledgeMode == AcknowledgeMode.NoAcknowledge)
+ {
+ _logger.Warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + count);
+ Suspend(false);
+ }
+ }
+ private void OnPrefetchHighMark(int count)
+ {
+ if (_acknowledgeMode == AcknowledgeMode.NoAcknowledge)
+ {
+ _logger.Warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + count);
+ Suspend(true);
+ }
+ }
+ private void Suspend(bool suspend)
+ {
+ lock (_suspensionLock)
+ {
+ if (_logger.IsDebugEnabled)
+ {
+ _logger.Debug("Setting channel flow : " + (suspend ? "suspended" : "unsuspended"));
+ }
+ _suspended = suspend;
+ AMQFrame frame = ChannelFlowBody.CreateAMQFrame(_channelId, !suspend);
+ Connection.ConvenientProtocolWriter.SyncWrite(frame, typeof(ChannelFlowOkBody));
+ }
+ }
+ /// <summary>A Dispatcher turns the consumption of incoming messages from an arrival queue, into event notifications on consumers.
+ /// The arrival queue is typically a blocking queue, on which a dispatcher waits for messages to consume. Upon receipt of a message
+ /// the dispatcher finds the consumer that is listening to the queue to which the message has been send and notifies it of the new
+ /// message.
+ ///
+ /// <p/>The Dispatcher also contains logic to recognize bounced messages. Bounced messages returned from the broker can be
+ /// told apart from regular deliveries because they do not have a delivery queue set on them. When the dispatcher receives a
+ /// bounced message it creates an exception and notifies the connection, to which its containing channel belongs, of the condition.
+ ///
+ /// <p/><table id="crc"><caption>CRC Card</caption>
+ /// <tr><th> Responsibilities <th> Collaborations
+ /// <tr><td> Notify consumers of message arrivals on their queues. <td> <see cref="BasicMessageConsumer"/>
+ /// <tr><td> Notify the containing connection of bounced message arrivals. <td> <see cref="AMQConnection"/>
+ /// </table>
+ /// </summary>
+ ///
+ /// <remarks>Stop mechanism seems wrong, as queue consume is evaluated after stop flag, so could consume and notify one more message.
+ /// Placing stop check after consume may also be wrong as it may cause a message to be thrown away. Seems more correct to use interupt on
+ /// the block thread to cause it to prematurely return from its wait, whereupon it can be made to re-check the stop flag.</remarks>
+ ///
+ /// <remarks>Exception swallowed, if there is an exception whilst notifying the connection on bounced messages. Unhandled excetpion should
+ /// fall through and terminate the loop, as it is a bug if it occurrs.</remarks>
+ private class Dispatcher
+ {
+ /// <summary> Flag used to indicate when this dispatcher is to be stopped (0=go, 1=stop). </summary>
+ private int _stopped = 0;
+ /// <summary> The channel for which this is a dispatcher. </summary>
+ private AmqChannel _containingChannel;
+ /// <summary> Creates a dispatcher on the specified channel. </summary>
+ ///
+ /// <param name="containingChannel"> The channel on which this is a dispatcher. </param>
+ public Dispatcher(AmqChannel containingChannel)
+ {
+ _containingChannel = containingChannel;
+ }
+ /// <summary>The message dispatch loop. Consumes messages from the channels queue, notifying consumers of regular deliveries, and
+ /// the connection of bounced messages.</summary>
+ public void RunDispatcher()
+ {
+ UnprocessedMessage message;
+ while (_stopped == 0 && (message = (UnprocessedMessage)_containingChannel._queue.Dequeue()) != null)
+ {
+ if (message.DeliverBody != null)
+ {
+ BasicMessageConsumer consumer = (BasicMessageConsumer) _containingChannel._consumers[message.DeliverBody.ConsumerTag];
+ if (consumer == null)
+ {
+ _logger.Warn("Received a message from queue " + message.DeliverBody.ConsumerTag + " without a f - ignoring...");
+ }
+ else
+ {
+ consumer.NotifyMessage(message, _containingChannel.ChannelId);
+ }
+ }
+ else
+ {
+ try
+ {
+ // Bounced message is processed here, away from the transport thread
+ AbstractQmsMessage bouncedMessage = _containingChannel._messageFactoryRegistry.
+ CreateMessage(0, false, message.ContentHeader, message.Bodies);
+ int errorCode = message.BounceBody.ReplyCode;
+ string reason = message.BounceBody.ReplyText;
+ _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")");
+ _containingChannel._connection.ExceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
+ }
+ catch (Exception e)
+ {
+ _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e);
+ }
+ }
+ }
+ _logger.Debug("Dispatcher thread terminating for channel: " + _containingChannel._channelId + ".");
+ }
+ /// <summary> Sets a stop flag on this dispatcher, which causes its dispatch loop to exit at the next available opportunity. </summary>
+ public void StopDispatcher()
+ {
+ Interlocked.Exchange(ref _stopped, 1);
+ }
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs b/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
new file mode 100644
index 0000000000..fdac5e75f2
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
@@ -0,0 +1,485 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+using System.Collections;
+using System.Collections.Generic;
+using log4net;
+using Apache.Qpid.Client.Message;
+using Apache.Qpid.Collections;
+using Apache.Qpid.Framing;
+using Apache.Qpid.Messaging;
+namespace Apache.Qpid.Client
+ public class BasicMessageConsumer : Closeable, IMessageConsumer
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(BasicMessageConsumer));
+ private bool _noLocal;
+ /** Holds the exclusive status flag for the consumers access to its queue. */
+ private bool _exclusive;
+ public bool Exclusive
+ {
+ get { return _exclusive; }
+ }
+ private bool _browse;
+ public bool Browse
+ {
+ get { return _browse; }
+ }
+ public bool NoLocal
+ {
+ get { return _noLocal; }
+ set { _noLocal = value; }
+ }
+ private AcknowledgeMode _acknowledgeMode;
+ public AcknowledgeMode AcknowledgeMode
+ {
+ get { return _acknowledgeMode; }
+ }
+ private MessageReceivedDelegate _messageListener;
+ private bool IsMessageListenerSet
+ {
+ get { return _messageListener != null; }
+ }
+ /// <summary>
+ /// The consumer tag allows us to close the consumer by sending a jmsCancel method to the
+ /// broker
+ /// </summary>
+ private string _consumerTag;
+ /// <summary>
+ /// We need to know the channel id when constructing frames
+ /// </summary>
+ private ushort _channelId;
+ private readonly string _queueName;
+ /// <summary>
+ /// Protects the setting of a messageListener
+ /// </summary>
+ private readonly object _syncLock = new object();
+ /// <summary>
+ /// We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of failover
+ /// </summary>
+ private int _prefetchHigh;
+ /// <summary>
+ /// We store the low water prefetch field in order to be able to reuse it when resubscribing in the event of failover
+ /// </summary>
+ private int _prefetchLow;
+ /// <summary>
+ /// When true indicates that either a message listener is set or that
+ /// a blocking receive call is in progress
+ /// </summary>
+ private bool _receiving;
+ /// <summary>
+ /// Used in the blocking receive methods to receive a message from
+ /// the Channel thread.
+ /// </summary>
+ private readonly ConsumerProducerQueue _messageQueue = new ConsumerProducerQueue();
+ private MessageFactoryRegistry _messageFactory;
+ private AmqChannel _channel;
+ // <summary>
+ // Tag of last message delievered, whoch should be acknowledged on commit in transaction mode.
+ // </summary>
+ //private long _lastDeliveryTag;
+ /// <summary>
+ /// Explicit list of all received but un-acked messages in a transaction. Used to ensure acking is completed when transaction is committed.
+ /// </summary>
+ private LinkedList<long> _receivedDeliveryTags;
+ /// <summary>
+ /// Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
+ /// </summary>
+ private int _outstanding;
+ /// <summary>
+ /// Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode.
+ /// Enabled when _outstannding number of msgs >= _prefetchHigh and disabled at < _prefetchLow
+ /// </summary>
+ private bool _dups_ok_acknowledge_send;
+ internal BasicMessageConsumer(ushort channelId, string queueName, bool noLocal,
+ MessageFactoryRegistry messageFactory, AmqChannel channel,
+ int prefetchHigh, int prefetchLow, bool exclusive, bool browse)
+ {
+ _channelId = channelId;
+ _queueName = queueName;
+ _noLocal = noLocal;
+ _messageFactory = messageFactory;
+ _channel = channel;
+ _acknowledgeMode = _channel.AcknowledgeMode;
+ _prefetchHigh = prefetchHigh;
+ _prefetchLow = prefetchLow;
+ _exclusive = exclusive;
+ _browse = browse;
+ if (_acknowledgeMode == AcknowledgeMode.SessionTransacted)
+ {
+ _receivedDeliveryTags = new LinkedList<long>();
+ }
+ }
+ #region IMessageConsumer Members
+ public MessageReceivedDelegate OnMessage
+ {
+ get
+ {
+ return _messageListener;
+ }
+ set
+ {
+ CheckNotClosed();
+ lock (_syncLock)
+ {
+ // If someone is already receiving
+ if (_messageListener != null && _receiving)
+ {
+ throw new InvalidOperationException("Another thread is already receiving...");
+ }
+ _messageListener = value;
+ _receiving = (_messageListener != null);
+ if (_receiving)
+ {
+ _logger.Debug("Message listener set for queue with name " + _queueName);
+ }
+ }
+ }
+ }
+ public IMessage Receive(long delay)
+ {
+ CheckNotClosed();
+ lock (_syncLock)
+ {
+ // If someone is already receiving
+ if (_receiving)
+ {
+ throw new InvalidOperationException("Another thread is already receiving (possibly asynchronously)...");
+ }
+ _receiving = true;
+ }
+ try
+ {
+ object o = _messageQueue.Dequeue(delay);
+ return ReturnMessageOrThrowAndPostDeliver(o);
+ }
+ finally
+ {
+ lock (_syncLock)
+ {
+ _receiving = false;
+ }
+ }
+ }
+ private IMessage ReturnMessageOrThrowAndPostDeliver(object o)
+ {
+ IMessage m = ReturnMessageOrThrow(o);
+ if (m != null)
+ {
+ PostDeliver(m);
+ }
+ return m;
+ }
+ public IMessage Receive()
+ {
+ return Receive(Timeout.Infinite);
+ }
+ public IMessage ReceiveNoWait()
+ {
+ return Receive(0);
+ }
+ #endregion
+ /// <summary>
+ /// We can get back either a Message or an exception from the queue. This method examines the argument and deals
+ /// with it by throwing it (if an exception) or returning it (in any other case).
+ /// </summary>
+ /// <param name="o">the object off the queue</param>
+ /// <returns> a message only if o is a Message</returns>
+ /// <exception>JMSException if the argument is a throwable. If it is a QpidMessagingException it is rethrown as is, but if not
+ /// a QpidMessagingException is created with the linked exception set appropriately</exception>
+ private IMessage ReturnMessageOrThrow(object o)
+ {
+ // errors are passed via the queue too since there is no way of interrupting the poll() via the API.
+ if (o is Exception)
+ {
+ Exception e = (Exception) o;
+ throw new QpidException("Message consumer forcibly closed due to error: " + e, e);
+ }
+ else
+ {
+ return (IMessage) o;
+ }
+ }
+ #region IDisposable Members
+ public void Dispose()
+ {
+ Close();
+ }
+ #endregion
+ public override void Close()
+ {
+ if (_closed == CLOSED)
+ {
+ return;
+ }
+ // FIXME: Don't we need FailoverSupport here (as we have SyncWrite). i.e. rather than just locking FailOverMutex
+ lock (_channel.Connection.FailoverMutex)
+ {
+ lock (_closingLock)
+ {
+ Interlocked.Exchange(ref _closed, CLOSED);
+ AMQFrame cancelFrame = BasicCancelBody.CreateAMQFrame(_channelId, _consumerTag, false);
+ try
+ {
+ _channel.Connection.ConvenientProtocolWriter.SyncWrite(
+ cancelFrame, typeof(BasicCancelOkBody));
+ }
+ catch (AMQException e)
+ {
+ _logger.Error("Error closing consumer: " + e, e);
+ throw new QpidException("Error closing consumer: " + e);
+ }
+ finally
+ {
+ DeregisterConsumer();
+ }
+ }
+ }
+ }
+ /**
+ * Called from the AMQSession when a message has arrived for this consumer. This methods handles both the case
+ * of a message listener or a synchronous receive() caller.
+ *
+ * @param messageFrame the raw unprocessed mesage
+ * @param channelId channel on which this message was sent
+ */
+ internal void NotifyMessage(UnprocessedMessage messageFrame, int channelId)
+ {
+ if (_logger.IsDebugEnabled)
+ {
+ _logger.Debug("notifyMessage called with message number " + messageFrame.DeliverBody.DeliveryTag);
+ }
+ try
+ {
+ AbstractQmsMessage jmsMessage = _messageFactory.CreateMessage((long)messageFrame.DeliverBody.DeliveryTag,
+ messageFrame.DeliverBody.Redelivered,
+ messageFrame.ContentHeader,
+ messageFrame.Bodies);
+ _logger.Debug("Message is of type: " + jmsMessage.GetType().Name);
+ PreDeliver(jmsMessage);
+ if (IsMessageListenerSet)
+ {
+ // We do not need a lock around the test above, and the dispatch below as it is invalid
+ // for an application to alter an installed listener while the session is started.
+#if __MonoCS__
+ _messageListener(jmsMessage);
+ _messageListener.Invoke(jmsMessage);
+ PostDeliver(jmsMessage);
+ }
+ else
+ {
+ _messageQueue.Enqueue(jmsMessage);
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.Error("Caught exception (dump follows) - ignoring...", e); // FIXME
+ }
+ }
+ internal void NotifyError(Exception cause)
+ {
+ lock (_syncLock)
+ {
+ SetClosed();
+ // we have no way of propagating the exception to a message listener - a JMS limitation - so we
+ // deal with the case where we have a synchronous receive() waiting for a message to arrive
+ if (_messageListener == null)
+ {
+ // offer only succeeds if there is a thread waiting for an item from the queue
+ _messageQueue.Enqueue(cause);
+ _logger.Debug("Passed exception to synchronous queue for propagation to receive()");
+ }
+ DeregisterConsumer();
+ }
+ }
+ private void SetClosed()
+ {
+ Interlocked.Exchange(ref _closed, CLOSED);
+ }
+ /// <summary>
+ /// Perform cleanup to deregister this consumer. This occurs when closing the consumer in both the clean
+ /// case and in the case of an error occurring.
+ /// </summary>
+ internal void DeregisterConsumer()
+ {
+ _channel.DeregisterConsumer(_consumerTag);
+ }
+ public string ConsumerTag
+ {
+ get
+ {
+ return _consumerTag;
+ }
+ set
+ {
+ _consumerTag = value;
+ }
+ }
+ /**
+ * Called when you need to invalidate a consumer. Used for example when failover has occurred and the
+ * client has vetoed automatic resubscription.
+ * The caller must hold the failover mutex.
+ */
+ internal void MarkClosed()
+ {
+ SetClosed();
+ DeregisterConsumer();
+ }
+ public string QueueName
+ {
+ get { return _queueName; }
+ }
+ /// <summary>
+ /// Acknowledge up to last message delivered (if any). Used when commiting.
+ /// </summary>
+ internal void AcknowledgeDelivered()
+ {
+ foreach (long tag in _receivedDeliveryTags)
+ {
+ _channel.AcknowledgeMessage((ulong)tag, false);
+ }
+ _receivedDeliveryTags.Clear();
+ }
+ internal void RejectUnacked()
+ {
+ foreach (long tag in _receivedDeliveryTags)
+ {
+ _channel.RejectMessage((ulong)tag, true);
+ }
+ _receivedDeliveryTags.Clear();
+ }
+ private void PreDeliver(AbstractQmsMessage msg)
+ {
+ switch (AcknowledgeMode)
+ {
+ case AcknowledgeMode.PreAcknowledge:
+ _channel.AcknowledgeMessage((ulong)msg.DeliveryTag, false);
+ break;
+ case AcknowledgeMode.ClientAcknowledge:
+ // We set the session so that when the user calls acknowledge() it can call the method on session
+ // to send out the appropriate frame.
+ //msg.setAMQSession(_session);
+ msg.Channel = _channel;
+ break;
+ }
+ }
+ private void PostDeliver(IMessage m)
+ {
+ AbstractQmsMessage msg = (AbstractQmsMessage) m;
+ switch (AcknowledgeMode)
+ {
+ case AcknowledgeMode.DupsOkAcknowledge:
+ if (++_outstanding >= _prefetchHigh)
+ {
+ _dups_ok_acknowledge_send = true;
+ }
+ if (_outstanding <= _prefetchLow)
+ {
+ _dups_ok_acknowledge_send = false;
+ }
+ if (_dups_ok_acknowledge_send)
+ {
+ _channel.AcknowledgeMessage((ulong)msg.DeliveryTag, true);
+ }
+ break;
+ case AcknowledgeMode.AutoAcknowledge:
+ _channel.AcknowledgeMessage((ulong)msg.DeliveryTag, true);
+ break;
+ case AcknowledgeMode.SessionTransacted:
+ _receivedDeliveryTags.AddLast(msg.DeliveryTag);
+ break;
+ }
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs b/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
new file mode 100644
index 0000000000..f33afc452e
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
@@ -0,0 +1,405 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+using log4net;
+using Apache.Qpid.Buffer;
+using Apache.Qpid.Client.Message;
+using Apache.Qpid.Messaging;
+using Apache.Qpid.Framing;
+namespace Apache.Qpid.Client
+ public class BasicMessageProducer : Closeable, IMessagePublisher
+ {
+ protected readonly ILog _logger = LogManager.GetLogger(typeof(BasicMessageProducer));
+ /// <summary>
+ /// If true, messages will not get a timestamp.
+ /// </summary>
+ private bool _disableTimestamps;
+ /// <summary>
+ /// Priority of messages created by this producer.
+ /// </summary>
+ private int _messagePriority;
+ /// <summary>
+ /// Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.
+ /// </summary>
+ private long _timeToLive;
+ /// <summary>
+ /// Delivery mode used for this producer.
+ /// </summary>
+ private DeliveryMode _deliveryMode;
+ private bool _immediate;
+ private bool _mandatory;
+ string _exchangeName;
+ string _routingKey;
+ /// <summary>
+ /// Default encoding used for messages produced by this producer.
+ /// </summary>
+ private string _encoding;
+ /// <summary>
+ /// Default encoding used for message produced by this producer.
+ /// </summary>
+ private string _mimeType;
+ /// <summary>
+ /// True if this producer was created from a transacted session
+ /// </summary>
+ private bool _transacted;
+ private ushort _channelId;
+ /// <summary>
+ /// This is an id generated by the session and is used to tie individual producers to the session. This means we
+ /// can deregister a producer with the session when the producer is closed. We need to be able to tie producers
+ /// to the session so that when an error is propagated to the session it can close the producer (meaning that
+ /// a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently).
+ /// </summary>
+ private long _producerId;
+ /// <summary>
+ /// The session used to create this producer
+ /// </summary>
+ private AmqChannel _channel;
+ public BasicMessageProducer(string exchangeName, string routingKey,
+ bool transacted,
+ ushort channelId,
+ AmqChannel channel,
+ long producerId,
+ DeliveryMode deliveryMode,
+ long timeToLive,
+ bool immediate,
+ bool mandatory,
+ int priority)
+ {
+ _exchangeName = exchangeName;
+ _routingKey = routingKey;
+ _transacted = transacted;
+ _channelId = channelId;
+ _channel = channel;
+ _producerId = producerId;
+ _deliveryMode = deliveryMode;
+ _timeToLive = timeToLive;
+ _immediate = immediate;
+ _mandatory = mandatory;
+ _messagePriority = priority;
+ _channel.RegisterProducer(producerId, this);
+ }
+ #region IMessagePublisher Members
+ public DeliveryMode DeliveryMode
+ {
+ get
+ {
+ CheckNotClosed();
+ return _deliveryMode;
+ }
+ set
+ {
+ CheckNotClosed();
+ _deliveryMode = value;
+ }
+ }
+ public string ExchangeName
+ {
+ get { return _exchangeName; }
+ }
+ public string RoutingKey
+ {
+ get { return _routingKey; }
+ }
+ public bool DisableMessageID
+ {
+ get
+ {
+ throw new Exception("The method or operation is not implemented.");
+ }
+ set
+ {
+ throw new Exception("The method or operation is not implemented.");
+ }
+ }
+ public bool DisableMessageTimestamp
+ {
+ get
+ {
+ CheckNotClosed();
+ return _disableTimestamps;
+ }
+ set
+ {
+ CheckNotClosed();
+ _disableTimestamps = value;
+ }
+ }
+ public int Priority
+ {
+ get
+ {
+ CheckNotClosed();
+ return _messagePriority;
+ }
+ set
+ {
+ CheckNotClosed();
+ if ( value < 0 || value > 9 )
+ {
+ throw new ArgumentOutOfRangeException("Priority of " + value + " is illegal. Value must be in range 0 to 9");
+ }
+ _messagePriority = value;
+ }
+ }
+ public override void Close()
+ {
+ _logger.Debug("Closing producer " + this);
+ Interlocked.Exchange(ref _closed, CLOSED);
+ _channel.DeregisterProducer(_producerId);
+ }
+ public void Send(IMessage msg, DeliveryMode deliveryMode, int priority, long timeToLive)
+ {
+ CheckNotClosed();
+ SendImpl(
+ _exchangeName,
+ _routingKey,
+ (AbstractQmsMessage)msg,
+ deliveryMode,
+ priority,
+ (uint)timeToLive,
+ _mandatory,
+ _immediate
+ );
+ }
+ public void Send(IMessage msg)
+ {
+ CheckNotClosed();
+ SendImpl(
+ _exchangeName,
+ _routingKey,
+ (AbstractQmsMessage)msg,
+ _deliveryMode,
+ _messagePriority,
+ (uint)_timeToLive,
+ _mandatory,
+ _immediate
+ );
+ }
+ // This is a short-term hack (knowing that this code will be re-vamped sometime soon)
+ // to facilitate publishing messages to potentially non-existent recipients.
+ public void Send(IMessage msg, bool mandatory)
+ {
+ CheckNotClosed();
+ SendImpl(
+ _exchangeName,
+ _routingKey,
+ (AbstractQmsMessage)msg,
+ _deliveryMode,
+ _messagePriority,
+ (uint)_timeToLive,
+ mandatory,
+ _immediate
+ );
+ }
+ public long TimeToLive
+ {
+ get
+ {
+ CheckNotClosed();
+ return _timeToLive;
+ }
+ set
+ {
+ CheckNotClosed();
+ if ( value < 0 )
+ {
+ throw new ArgumentOutOfRangeException("Time to live must be non-negative - supplied value was " + value);
+ }
+ _timeToLive = value;
+ }
+ }
+ #endregion
+ public string MimeType
+ {
+ get
+ {
+ CheckNotClosed();
+ return _mimeType;
+ }
+ set
+ {
+ CheckNotClosed();
+ _mimeType = value;
+ }
+ }
+ public string Encoding
+ {
+ get
+ {
+ CheckNotClosed();
+ return _encoding;
+ }
+ set
+ {
+ CheckNotClosed();
+ _encoding = value;
+ }
+ }
+ public void Dispose()
+ {
+ Close();
+ }
+ #region Message Publishing
+ private void SendImpl(string exchangeName, string routingKey, AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive, bool mandatory, bool immediate)
+ {
+ // todo: handle session access ticket
+ AMQFrame publishFrame = BasicPublishBody.CreateAMQFrame(
+ _channel.ChannelId, 0, exchangeName,
+ routingKey, mandatory, immediate
+ );
+ // fix message properties
+ if ( !_disableTimestamps )
+ {
+ message.Timestamp = DateTime.UtcNow.Ticks;
+ if (timeToLive != 0)
+ {
+ message.Expiration = message.Timestamp + timeToLive;
+ }
+ } else
+ {
+ message.Expiration = 0;
+ }
+ message.DeliveryMode = deliveryMode;
+ message.Priority = (byte)priority;
+ ByteBuffer payload = message.Data;
+ int payloadLength = payload.Limit;
+ ContentBody[] contentBodies = CreateContentBodies(payload);
+ AMQFrame[] frames = new AMQFrame[2 + contentBodies.Length];
+ for ( int i = 0; i < contentBodies.Length; i++ )
+ {
+ frames[2 + i] = ContentBody.CreateAMQFrame(_channelId, contentBodies[i]);
+ }
+ if ( contentBodies.Length > 0 && _logger.IsDebugEnabled )
+ {
+ _logger.Debug(string.Format("Sending content body frames to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey));
+ }
+ // weight argument of zero indicates no child content headers, just bodies
+ AMQFrame contentHeaderFrame = ContentHeaderBody.CreateAMQFrame(
+ _channelId, AmqChannel.BASIC_CONTENT_TYPE, 0,
+ message.ContentHeaderProperties, (uint)payloadLength
+ );
+ if ( _logger.IsDebugEnabled )
+ {
+ _logger.Debug(string.Format("Sending content header frame to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey));
+ }
+ frames[0] = publishFrame;
+ frames[1] = contentHeaderFrame;
+ CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
+ lock ( _channel.Connection.FailoverMutex )
+ {
+ _channel.Connection.ProtocolWriter.Write(compositeFrame);
+ }
+ }
+ /// <summary>
+ /// Create content bodies. This will split a large message into numerous bodies depending on the negotiated
+ /// maximum frame size.
+ /// </summary>
+ /// <param name="payload"></param>
+ /// <returns>return the array of content bodies</returns>
+ private ContentBody[] CreateContentBodies(ByteBuffer payload)
+ {
+ if ( payload == null )
+ {
+ return null;
+ } else if ( payload.Remaining == 0 )
+ {
+ return new ContentBody[0];
+ }
+ // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
+ // (0xCE byte).
+ int framePayloadMax = (int)(_channel.Connection.MaximumFrameSize - 1);
+ int frameCount = CalculateContentBodyFrames(payload);
+ ContentBody[] bodies = new ContentBody[frameCount];
+ for ( int i = 0; i < frameCount; i++ )
+ {
+ int length = (payload.Remaining >= framePayloadMax)
+ ? framePayloadMax : payload.Remaining;
+ bodies[i] = new ContentBody(payload, (uint)length);
+ }
+ return bodies;
+ }
+ private int CalculateContentBodyFrames(ByteBuffer payload)
+ {
+ // we substract one from the total frame maximum size to account
+ // for the end of frame marker in a body frame
+ // (0xCE byte).
+ int frameCount;
+ if ( (payload == null) || (payload.Remaining == 0) )
+ {
+ frameCount = 0;
+ } else
+ {
+ int dataLength = payload.Remaining;
+ int framePayloadMax = (int)_channel.Connection.MaximumFrameSize - 1;
+ int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
+ frameCount = (int)(dataLength / framePayloadMax) + lastFrame;
+ }
+ return frameCount;
+ }
+ #endregion // Message Publishing
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Closeable.cs b/qpid/dotnet/Qpid.Client/Client/Closeable.cs
new file mode 100644
index 0000000000..b9664ccea3
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Closeable.cs
@@ -0,0 +1,83 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using Apache.Qpid.Messaging;
+namespace Apache.Qpid.Client
+ /// <summary>Closeable provides monitoring of the state of a closeable resource; whether it is open or closed. It also provides a lock on which
+ /// attempts to close the resource from multiple threads can be coordinated.
+ ///
+ /// <p/><table id="crc"><caption>CRC Card</caption>
+ /// <tr><th> Responsibilities <th> Collaborations
+ /// <tr><td> Close (and clean-up) a resource.
+ /// <tr><td> Monitor the state of a closeable resource.
+ /// <tr><td> Synchronous attempts to close resource from concurrent threads.
+ /// </table>
+ /// </summary>
+ ///
+ /// <remarks>Poor encapsulation of the close lock. Better to completely hide the implementation, such that there is a method, e.g., DoSingleClose,
+ /// that sub-classes implement. Guaranteed to only be called by one thread at once, and iff the object is not already closed. That is, multiple
+ /// simultaneous closes will result in a single call to the real close method. Put the wait and condition checking loop in this base class.
+ /// </remarks>
+ public abstract class Closeable : ICloseable
+ {
+ /// <summary> Constant representing the closed state. </summary>
+ protected const int CLOSED = 1;
+ /// <summary> Constant representing the open state. </summary>
+ protected const int NOT_CLOSED = 2;
+ /// <summary> Used to ensure orderly closing of the object. </summary>
+ protected readonly object _closingLock = new object();
+ /// <summary> Indicates the state of this resource; open or closed. </summary>
+ protected int _closed = NOT_CLOSED;
+ /// <summary>
+ /// Checks the not closed.
+ /// </summary>
+ ///
+ /// <remarks>Don't like check methods that throw exceptions. a) it can come as a surprise without checked exceptions, b) it limits the
+ /// callers choice, if the caller would prefer a boolean, c) it is not side-effect free programming, where such could be used. Get rid
+ /// of this and replace with boolean.</remarks>
+ protected void CheckNotClosed()
+ {
+ if (_closed == CLOSED)
+ {
+ throw new InvalidOperationException("Object " + ToString() + " has been closed");
+ }
+ }
+ /// <summary>Indicates whether this resource is closed.</summary>
+ /// <value><c>true</c> if closed; otherwise, <c>false</c>.</value>
+ public bool Closed
+ {
+ get
+ {
+ return _closed == CLOSED;
+ }
+ }
+ /// <summary> Close this resource. </summary>
+ public abstract void Close();
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Configuration/AuthenticationConfigurationSectionHandler.cs b/qpid/dotnet/Qpid.Client/Client/Configuration/AuthenticationConfigurationSectionHandler.cs
new file mode 100644
index 0000000000..8d289fa956
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Configuration/AuthenticationConfigurationSectionHandler.cs
@@ -0,0 +1,84 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Collections.Specialized;
+using System.Configuration;
+using System.Text;
+using Apache.Qpid.Client.Security;
+using Apache.Qpid.Sasl.Mechanisms;
+namespace Apache.Qpid.Client.Configuration
+ public class AuthenticationConfigurationSectionHandler
+ : IConfigurationSectionHandler
+ {
+ public object Create(object parent, object configContext, System.Xml.XmlNode section)
+ {
+ NameValueSectionHandler handler = new NameValueSectionHandler();
+ OrderedHashTable schemes = new OrderedHashTable();
+ NameValueCollection options = (NameValueCollection)
+ handler.Create(parent, configContext, section);
+ if ( options != null )
+ {
+ foreach ( string key in options.Keys )
+ {
+ Type type = Type.GetType(options[key]);
+ if ( type == null )
+ throw new ConfigurationException(string.Format("Type '{0}' not found", key));
+ if ( !typeof(IAMQCallbackHandler).IsAssignableFrom(type) )
+ throw new ConfigurationException(string.Format("Type '{0}' does not implement IAMQCallbackHandler", key));
+ schemes.Add(key, type);
+ }
+ }
+ return schemes;
+ }
+ } // class AuthenticationConfigurationSectionHandler
+ public class OrderedHashTable : Hashtable
+ {
+ private ArrayList _keys = new ArrayList();
+ public IList OrderedKeys
+ {
+ get { return _keys; }
+ }
+ public override void Add(object key, object value)
+ {
+ base.Add(key, value);
+ _keys.Add(key);
+ }
+ public override void Remove(object key)
+ {
+ base.Remove(key);
+ _keys.Remove(key);
+ }
+ }
+} // namespace Apache.Qpid.Client.Configuration
diff --git a/qpid/dotnet/Qpid.Client/Client/ConnectionTuneParameters.cs b/qpid/dotnet/Qpid.Client/Client/ConnectionTuneParameters.cs
new file mode 100644
index 0000000000..b21486bfa8
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/ConnectionTuneParameters.cs
@@ -0,0 +1,83 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace Apache.Qpid.Client
+ public class ConnectionTuneParameters
+ {
+ private uint _frameMax;
+ private ushort _channelMax;
+ private uint _hearbeat;
+ private uint _txnLimit;
+ public uint FrameMax
+ {
+ get
+ {
+ return _frameMax;
+ }
+ set
+ {
+ _frameMax = value;
+ }
+ }
+ public ushort ChannelMax
+ {
+ get
+ {
+ return _channelMax;
+ }
+ set
+ {
+ _channelMax = value;
+ }
+ }
+ public uint Heartbeat
+ {
+ get
+ {
+ return _hearbeat;
+ }
+ set
+ {
+ _hearbeat = value;
+ }
+ }
+ public uint TxnLimit
+ {
+ get
+ {
+ return _txnLimit;
+ }
+ set
+ {
+ _txnLimit = value;
+ }
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Failover/FailoverException.cs b/qpid/dotnet/Qpid.Client/Client/Failover/FailoverException.cs
new file mode 100644
index 0000000000..7013746414
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Failover/FailoverException.cs
@@ -0,0 +1,42 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Runtime.Serialization;
+namespace Apache.Qpid.Client.Failover
+ /// <summary>
+ /// This exception is thrown when failover is taking place and we need to let other
+ /// parts of the client know about this.
+ /// </summary>
+ [Serializable]
+ class FailoverException : Exception
+ {
+ public FailoverException(String message) : base(message)
+ {
+ }
+ protected FailoverException(SerializationInfo info, StreamingContext ctxt)
+ : base(info, ctxt)
+ {
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs b/qpid/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs
new file mode 100644
index 0000000000..83c69b7d25
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs
@@ -0,0 +1,175 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+using log4net;
+using Apache.Qpid.Client.Protocol;
+using Apache.Qpid.Client.State;
+namespace Apache.Qpid.Client.Failover
+ public class FailoverHandler
+ {
+ private static readonly ILog _log = LogManager.GetLogger(typeof(FailoverHandler));
+ private AMQConnection _connection;
+ /**
+ * Used where forcing the failover host
+ */
+ private String _host;
+ /**
+ * Used where forcing the failover port
+ */
+ private int _port;
+ public FailoverHandler(AMQConnection connection)
+ {
+ _connection = connection;
+ }
+ public void Run()
+ {
+ if (Thread.CurrentThread.IsBackground)
+ {
+ throw new InvalidOperationException("FailoverHandler must Run on a non-background thread.");
+ }
+ AMQProtocolListener pl = _connection.ProtocolListener;
+ pl.FailoverLatch = new ManualResetEvent(false);
+ // We wake up listeners. If they can handle failover, they will extend the
+ // FailoverSupport class and will in turn block on the latch until failover
+ // has completed before retrying the operation
+ _connection.ProtocolListener.PropagateExceptionToWaiters(new FailoverException("Failing over about to start"));
+ // Since failover impacts several structures we protect them all with a single mutex. These structures
+ // are also in child objects of the connection. This allows us to manipulate them without affecting
+ // client code which runs in a separate thread.
+ lock (_connection.FailoverMutex)
+ {
+ _log.Info("Starting failover process");
+ // We switch in a new state manager temporarily so that the interaction to get to the "connection open"
+ // state works, without us having to terminate any existing "state waiters". We could theoretically
+ // have a state waiter waiting until the connection is closed for some reason. Or in future we may have
+ // a slightly more complex state model therefore I felt it was worthwhile doing this.
+ AMQStateManager existingStateManager = _connection.ProtocolListener.StateManager;
+ _connection.ProtocolListener.StateManager = new AMQStateManager();
+ if (!_connection.FirePreFailover(_host != null))
+ {
+ _connection.ProtocolListener.StateManager = existingStateManager;
+ if (_host != null)
+ {
+ _connection.ExceptionReceived(new AMQDisconnectedException("Redirect was vetoed by client"));
+ }
+ else
+ {
+ _connection.ExceptionReceived(new AMQDisconnectedException("Failover was vetoed by client"));
+ }
+ pl.FailoverLatch.Set();
+ pl.FailoverLatch = null;
+ return;
+ }
+ bool failoverSucceeded;
+ // when host is non null we have a specified failover host otherwise we all the client to cycle through
+ // all specified hosts
+ // if _host has value then we are performing a redirect.
+ if (_host != null)
+ {
+ // todo: fix SSL support!
+ failoverSucceeded = _connection.AttemptReconnection(_host, _port, null);
+ }
+ else
+ {
+ failoverSucceeded = _connection.AttemptReconnection();
+ }
+ // XXX: at this point it appears that we are going to set StateManager to existingStateManager in
+ // XXX: both paths of control.
+ if (!failoverSucceeded)
+ {
+ _connection.ProtocolListener.StateManager = existingStateManager;
+ _connection.ExceptionReceived(
+ new AMQDisconnectedException("Server closed connection and no failover " +
+ "was successful"));
+ }
+ else
+ {
+ _connection.ProtocolListener.StateManager = existingStateManager;
+ try
+ {
+ if (_connection.FirePreResubscribe())
+ {
+ _log.Info("Resubscribing on new connection");
+ _connection.ResubscribeChannels();
+ }
+ else
+ {
+ _log.Info("Client vetoed automatic resubscription");
+ }
+ _connection.FireFailoverComplete();
+ _connection.ProtocolListener.FailoverState = FailoverState.NOT_STARTED;
+ _log.Info("Connection failover completed successfully");
+ }
+ catch (Exception e)
+ {
+ _log.Info("Failover process failed - exception being propagated by protocol handler");
+ _connection.ProtocolListener.FailoverState = FailoverState.FAILED;
+ try
+ {
+ _connection.ProtocolListener.OnException(e);
+ }
+ catch (Exception ex)
+ {
+ _log.Error("Error notifying protocol session of error: " + ex, ex);
+ }
+ }
+ }
+ }
+ pl.FailoverLatch.Set();
+ }
+ public String getHost()
+ {
+ return _host;
+ }
+ public void setHost(String host)
+ {
+ _host = host;
+ }
+ public int getPort()
+ {
+ return _port;
+ }
+ public void setPort(int port)
+ {
+ _port = port;
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Failover/FailoverState.cs b/qpid/dotnet/Qpid.Client/Client/Failover/FailoverState.cs
new file mode 100644
index 0000000000..3058cdcd69
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Failover/FailoverState.cs
@@ -0,0 +1,31 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace Apache.Qpid.Client.Failover
+ /// <summary>
+ /// Enumeration of failover states. Used to handle failover from within AMQProtocolHandler where MINA events need to be
+ /// dealt with and can happen during failover.
+ /// </summary>
+ enum FailoverState
+ {
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Failover/FailoverSupport.cs b/qpid/dotnet/Qpid.Client/Client/Failover/FailoverSupport.cs
new file mode 100644
index 0000000000..afa5301f39
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Failover/FailoverSupport.cs
@@ -0,0 +1,55 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using log4net;
+namespace Apache.Qpid.Client.Failover
+ public abstract class FailoverSupport
+ {
+ private static readonly ILog _log = LogManager.GetLogger(typeof(FailoverSupport));
+ public object execute(AMQConnection con)
+ {
+ // We wait until we are not in the middle of failover before acquiring the mutex and then proceeding.
+ // Any method that can potentially block for any reason should use this class so that deadlock will not
+ // occur. The FailoverException is propagated by the AMQProtocolHandler to any listeners (e.g. frame listeners)
+ // that might be causing a block. When that happens, the exception is caught here and the mutex is released
+ // before waiting for the failover to complete (either successfully or unsuccessfully).
+ while (true)
+ {
+ con.ProtocolListener.BlockUntilNotFailingOver();
+ lock (con.FailoverMutex)
+ {
+ try
+ {
+ return operation();
+ }
+ catch (FailoverException e)
+ {
+ _log.Info("Failover exception caught during operation", e);
+ }
+ }
+ }
+ }
+ protected abstract object operation();
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Handler/BasicDeliverMethodHandler.cs b/qpid/dotnet/Qpid.Client/Client/Handler/BasicDeliverMethodHandler.cs
new file mode 100644
index 0000000000..def1e78e8c
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Handler/BasicDeliverMethodHandler.cs
@@ -0,0 +1,42 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using log4net;
+using Apache.Qpid.Client.Message;
+using Apache.Qpid.Client.Protocol;
+using Apache.Qpid.Client.State;
+using Apache.Qpid.Framing;
+namespace Apache.Qpid.Client.Handler
+ public class BasicDeliverMethodHandler : IStateAwareMethodListener
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(BasicDeliverMethodHandler));
+ public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
+ {
+ UnprocessedMessage msg = new UnprocessedMessage();
+ msg.DeliverBody = (BasicDeliverBody) evt.Method;
+ msg.ChannelId = evt.ChannelId;
+ _logger.Debug("New JmsDeliver method received");
+ evt.ProtocolSession.UnprocessedMessageReceived(msg);
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs b/qpid/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs
new file mode 100644
index 0000000000..f413dfc9c6
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs
@@ -0,0 +1,44 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using log4net;
+using Apache.Qpid.Client.Message;
+using Apache.Qpid.Client.Protocol;
+using Apache.Qpid.Client.State;
+using Apache.Qpid.Framing;
+namespace Apache.Qpid.Client.Handler
+ public class BasicReturnMethodHandler : IStateAwareMethodListener
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(BasicReturnMethodHandler));
+ public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
+ {
+ _logger.Debug("New Basic.Return method received");
+ UnprocessedMessage msg = new UnprocessedMessage();
+ msg.DeliverBody = null;
+ msg.BounceBody = (BasicReturnBody) evt.Method;
+ msg.ChannelId = evt.ChannelId;
+ evt.ProtocolSession.UnprocessedMessageReceived(msg);
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs b/qpid/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs
new file mode 100644
index 0000000000..9ed09a0d01
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs
@@ -0,0 +1,68 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using log4net;
+using Apache.Qpid.Client.Protocol;
+using Apache.Qpid.Client.State;
+using Apache.Qpid.Protocol;
+using Apache.Qpid.Framing;
+namespace Apache.Qpid.Client.Handler
+ public class ChannelCloseMethodHandler : IStateAwareMethodListener
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(ChannelCloseMethodHandler));
+ public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
+ {
+ _logger.Debug("ChannelClose method received");
+ ChannelCloseBody method = (ChannelCloseBody) evt.Method;
+ int errorCode = method.ReplyCode;
+ string reason = method.ReplyText;
+ if (_logger.IsDebugEnabled)
+ {
+ _logger.Debug("Channel close reply code: " + errorCode + ", reason: " + reason);
+ }
+ AMQFrame frame = ChannelCloseOkBody.CreateAMQFrame(evt.ChannelId);
+ evt.ProtocolSession.WriteFrame(frame);
+ if ( errorCode != AMQConstant.REPLY_SUCCESS.Code )
+ {
+ _logger.Debug("Channel close received with errorCode " + errorCode + ", throwing exception");
+ if ( errorCode == AMQConstant.NO_CONSUMERS.Code )
+ throw new AMQNoConsumersException(reason);
+ if ( errorCode == AMQConstant.NO_ROUTE.Code )
+ throw new AMQNoRouteException(reason);
+ if ( errorCode == AMQConstant.INVALID_ARGUMENT.Code )
+ throw new AMQInvalidArgumentException(reason);
+ if ( errorCode == AMQConstant.INVALID_ROUTING_KEY.Code )
+ throw new AMQInvalidRoutingKeyException(reason);
+ // any other
+ throw new AMQChannelClosedException(errorCode, "Error: " + reason);
+ }
+ evt.ProtocolSession.ChannelClosed(evt.ChannelId, errorCode, reason);
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionCloseMethodHandler.cs b/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionCloseMethodHandler.cs
new file mode 100644
index 0000000000..66cff3bc65
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionCloseMethodHandler.cs
@@ -0,0 +1,68 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+using log4net;
+using Apache.Qpid.Client.Protocol;
+using Apache.Qpid.Client.State;
+using Apache.Qpid.Framing;
+using Apache.Qpid.Protocol;
+namespace Apache.Qpid.Client.Handler
+ public class ConnectionCloseMethodHandler : IStateAwareMethodListener
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(ConnectionCloseMethodHandler));
+ public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
+ {
+ _logger.Debug("ConnectionClose frame received");
+ ConnectionCloseBody method = (ConnectionCloseBody) evt.Method;
+ int errorCode = method.ReplyCode;
+ String reason = method.ReplyText;
+ // send CloseOK
+ evt.ProtocolSession.WriteFrame(ConnectionCloseOkBody.CreateAMQFrame(evt.ChannelId));
+ if ( errorCode != AMQConstant.REPLY_SUCCESS.Code )
+ {
+ if ( errorCode == AMQConstant.NOT_ALLOWED.Code )
+ {
+ _logger.Info("Authentication Error: " + Thread.CurrentThread.Name);
+ evt.ProtocolSession.CloseProtocolSession();
+ //todo this is a bit of a fudge (could be conssidered such as each new connection needs a new state manager or at least a fresh state.
+ stateManager.ChangeState(AMQState.CONNECTION_NOT_STARTED);
+ throw new AMQAuthenticationException(errorCode, reason);
+ } else
+ {
+ _logger.Info("Connection close received with error code " + errorCode);
+ throw new AMQConnectionClosedException(errorCode, "Error: " + reason);
+ }
+ }
+ // this actually closes the connection in the case where it is not an error.
+ evt.ProtocolSession.CloseProtocolSession();
+ stateManager.ChangeState(AMQState.CONNECTION_CLOSED);
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionCloseOkHandler.cs b/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionCloseOkHandler.cs
new file mode 100644
index 0000000000..038da15731
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionCloseOkHandler.cs
@@ -0,0 +1,41 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using log4net;
+using Apache.Qpid.Client.Protocol;
+using Apache.Qpid.Client.State;
+using Apache.Qpid.Framing;
+namespace Apache.Qpid.Client.Handler
+ public class ConnectionCloseOkHandler : IStateAwareMethodListener
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(ConnectionCloseOkHandler));
+ public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
+ {
+ _logger.Debug("ConnectionCloseOk frame received");
+// ConnectionCloseOkBody method = (ConnectionCloseOkBody)evt.Method;
+ stateManager.ChangeState(AMQState.CONNECTION_CLOSED);
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionOpenOkMethodHandler.cs b/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionOpenOkMethodHandler.cs
new file mode 100644
index 0000000000..a12e4ead60
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionOpenOkMethodHandler.cs
@@ -0,0 +1,35 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using Apache.Qpid.Client.Protocol;
+using Apache.Qpid.Client.State;
+namespace Apache.Qpid.Client.Handler
+ public class ConnectionOpenOkMethodHandler : IStateAwareMethodListener
+ {
+ public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
+ {
+ stateManager.ChangeState(AMQState.CONNECTION_OPEN);
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionRedirectMethodHandler.cs b/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionRedirectMethodHandler.cs
new file mode 100644
index 0000000000..08cc580b17
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionRedirectMethodHandler.cs
@@ -0,0 +1,68 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using log4net;
+using Apache.Qpid.Client.Protocol;
+using Apache.Qpid.Client.State;
+namespace Apache.Qpid.Client.Handler
+ public class ConnectionRedirectMethodHandler : IStateAwareMethodListener
+ {
+// private static readonly ILog _logger = LogManager.GetLogger(typeof(ConnectionRedirectMethodHandler));
+ private const int DEFAULT_REDIRECT_PORT = 5672;
+ private static ConnectionRedirectMethodHandler _handler = new ConnectionRedirectMethodHandler();
+ public static ConnectionRedirectMethodHandler GetInstance()
+ {
+ return _handler;
+ }
+ private ConnectionRedirectMethodHandler()
+ {
+ }
+ public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
+ {
+ /*_logger.Info("ConnectionRedirect frame received");
+ ConnectionRedirectBody method = (ConnectionRedirectBody) evt.Method;
+ // the host is in the form hostname:port with the port being optional
+ int portIndex = method.Host.IndexOf(':');
+ String host;
+ int port;
+ if (portIndex == -1)
+ {
+ host = method.Host;
+ }
+ else
+ {
+ host = method.Host.Substring(0, portIndex);
+ port = Int32.Parse(method.Host.Substring(portIndex + 1));
+ }
+ evt.ProtocolSession.Failover(host, port);*/
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionSecureMethodHandler.cs b/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionSecureMethodHandler.cs
new file mode 100644
index 0000000000..9333d4d0a6
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionSecureMethodHandler.cs
@@ -0,0 +1,60 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using Apache.Qpid.Client.Protocol;
+using Apache.Qpid.Client.State;
+using Apache.Qpid.Framing;
+using Apache.Qpid.Sasl;
+namespace Apache.Qpid.Client.Handler
+ public class ConnectionSecureMethodHandler : IStateAwareMethodListener
+ {
+ public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
+ {
+ ISaslClient saslClient = evt.ProtocolSession.SaslClient;
+ if ( saslClient == null )
+ {
+ throw new AMQException("No SASL client set up - cannot proceed with authentication");
+ }
+ ConnectionSecureBody body = (ConnectionSecureBody)evt.Method;
+ try
+ {
+ // Evaluate server challenge
+ byte[] response = saslClient.EvaluateChallenge(body.Challenge);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame responseFrame = ConnectionSecureOkBody.CreateAMQFrame(
+ evt.ChannelId, response);
+ evt.ProtocolSession.WriteFrame(responseFrame);
+ } catch ( SaslException e )
+ {
+ throw new AMQException("Error processing SASL challenge: " + e, e);
+ }
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionStartMethodHandler.cs b/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionStartMethodHandler.cs
new file mode 100644
index 0000000000..c54662286b
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionStartMethodHandler.cs
@@ -0,0 +1,144 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Text;
+using log4net;
+using Apache.Qpid.Client.Protocol;
+using Apache.Qpid.Client.Security;
+using Apache.Qpid.Client.State;
+using Apache.Qpid.Framing;
+using Apache.Qpid.Sasl;
+namespace Apache.Qpid.Client.Handler
+ public class ConnectionStartMethodHandler : IStateAwareMethodListener
+ {
+ private static readonly ILog _log = LogManager.GetLogger(typeof(ConnectionStartMethodHandler));
+ public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
+ {
+ ConnectionStartBody body = (ConnectionStartBody) evt.Method;
+ AMQProtocolSession ps = evt.ProtocolSession;
+ try
+ {
+ if ( body.Mechanisms == null )
+ {
+ throw new AMQException("mechanism not specified in ConnectionStart method frame");
+ }
+ string mechanisms = Encoding.UTF8.GetString(body.Mechanisms);
+ string selectedMechanism = ChooseMechanism(mechanisms);
+ if ( selectedMechanism == null )
+ {
+ throw new AMQException("No supported security mechanism found, passed: " + mechanisms);
+ }
+ byte[] saslResponse = DoAuthentication(selectedMechanism, ps);
+ if (body.Locales == null)
+ {
+ throw new AMQException("Locales is not defined in Connection Start method");
+ }
+ string allLocales = Encoding.ASCII.GetString(body.Locales);
+ string[] locales = allLocales.Split(' ');
+ string selectedLocale;
+ if (locales != null && locales.Length > 0)
+ {
+ selectedLocale = locales[0];
+ }
+ else
+ {
+ throw new AMQException("No locales sent from server, passed: " + locales);
+ }
+ stateManager.ChangeState(AMQState.CONNECTION_NOT_TUNED);
+ FieldTable clientProperties = new FieldTable();
+ clientProperties["product"] = "Apache.Qpid.NET";
+ clientProperties["version"] = "1.0";
+ clientProperties["platform"] = GetFullSystemInfo();
+ clientProperties["instance"] = ps.ClientID;
+ AMQFrame frame = ConnectionStartOkBody.CreateAMQFrame(
+ evt.ChannelId, clientProperties, selectedMechanism,
+ saslResponse, selectedLocale);
+ ps.WriteFrame(frame);
+ }
+ catch (Exception e)
+ {
+ throw new AMQException(_log, "Unable to decode data: " + e, e);
+ }
+ }
+ private string GetFullSystemInfo()
+ {
+ StringBuilder sysInfo = new StringBuilder();
+ // check if we're running on mono or .net
+ Type monoRuntime = Type.GetType("Mono.Runtime");
+ if ( monoRuntime != null )
+ sysInfo.Append("Mono");
+ else
+ sysInfo.Append(".NET");
+ sysInfo.Append(" ").Append(Environment.Version);
+ sysInfo.Append(", ").Append(Environment.OSVersion);
+ return sysInfo.ToString();
+ }
+ private string ChooseMechanism(string mechanisms)
+ {
+ return CallbackHandlerRegistry.Instance.ChooseMechanism(mechanisms);
+ }
+ private byte[] DoAuthentication(string selectedMechanism, AMQProtocolSession ps)
+ {
+ ISaslClient saslClient = Sasl.Sasl.CreateClient(
+ new string[] { selectedMechanism }, null, "AMQP", "localhost",
+ new Hashtable(), CreateCallbackHandler(selectedMechanism, ps)
+ );
+ if ( saslClient == null )
+ {
+ throw new AMQException("Client SASL configuration error: no SaslClient could be created for mechanism " +
+ selectedMechanism);
+ }
+ ps.SaslClient = saslClient;
+ try
+ {
+ return saslClient.HasInitialResponse ?
+ saslClient.EvaluateChallenge(new byte[0]) : null;
+ } catch ( Exception ex )
+ {
+ ps.SaslClient = null;
+ throw new AMQException("Unable to create SASL client", ex);
+ }
+ }
+ private IAMQCallbackHandler CreateCallbackHandler(string mechanism, AMQProtocolSession session)
+ {
+ Type type = CallbackHandlerRegistry.Instance.GetCallbackHandler(mechanism);
+ IAMQCallbackHandler handler =
+ (IAMQCallbackHandler)Activator.CreateInstance(type);
+ if ( handler == null )
+ throw new AMQException("Unable to create callback handler: " + mechanism);
+ handler.Initialize(session);
+ return handler;
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs b/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs
new file mode 100644
index 0000000000..15a1d908b7
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs
@@ -0,0 +1,63 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using log4net;
+using Apache.Qpid.Client.Protocol;
+using Apache.Qpid.Client.State;
+using Apache.Qpid.Framing;
+namespace Apache.Qpid.Client.Handler
+ public class ConnectionTuneMethodHandler : IStateAwareMethodListener
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(ConnectionTuneMethodHandler));
+ public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
+ {
+ _logger.Debug("ConnectionTune frame received");
+ ConnectionTuneBody frame = (ConnectionTuneBody) evt.Method;
+ AMQProtocolSession session = evt.ProtocolSession;
+ ConnectionTuneParameters parameters = session.ConnectionTuneParameters;
+ if (parameters == null)
+ {
+ parameters = new ConnectionTuneParameters();
+ }
+ _logger.Debug(String.Format("ConnectionTune.heartbeat = {0}.", frame.Heartbeat));
+ parameters.FrameMax = frame.FrameMax;
+ parameters.Heartbeat = frame.Heartbeat;
+ session.ConnectionTuneParameters = parameters;
+ stateManager.ChangeState(AMQState.CONNECTION_NOT_OPENED);
+ session.WriteFrame(ConnectionTuneOkBody.CreateAMQFrame(
+ evt.ChannelId, frame.ChannelMax, frame.FrameMax, frame.Heartbeat));
+ session.WriteFrame(ConnectionOpenBody.CreateAMQFrame(
+ evt.ChannelId, session.AMQConnection.VirtualHost, null, true));
+ if (frame.Heartbeat > 0)
+ {
+ evt.ProtocolSession.AMQConnection.StartHeartBeatThread(frame.Heartbeat);
+ }
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Handler/QueueDeleteOkMethodHandler.cs b/qpid/dotnet/Qpid.Client/Client/Handler/QueueDeleteOkMethodHandler.cs
new file mode 100644
index 0000000000..70aa3e1078
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Handler/QueueDeleteOkMethodHandler.cs
@@ -0,0 +1,44 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using log4net;
+using Apache.Qpid.Client.Message;
+using Apache.Qpid.Client.Protocol;
+using Apache.Qpid.Client.State;
+using Apache.Qpid.Framing;
+namespace Apache.Qpid.Client.Handler
+ public class QueueDeleteOkMethodHandler : IStateAwareMethodListener
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(QueueDeleteOkMethodHandler));
+ public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
+ {
+ QueueDeleteOkBody body = (QueueDeleteOkBody)evt.Method;
+ if (body != null)
+ {
+ _logger.InfoFormat("Received Queue.Delete-Ok message, message count {0}", body.MessageCount);
+ }
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Handler/QueuePurgeOkMethodHandler.cs b/qpid/dotnet/Qpid.Client/Client/Handler/QueuePurgeOkMethodHandler.cs
new file mode 100644
index 0000000000..22db70575d
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Handler/QueuePurgeOkMethodHandler.cs
@@ -0,0 +1,44 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using log4net;
+using Apache.Qpid.Client.Message;
+using Apache.Qpid.Client.Protocol;
+using Apache.Qpid.Client.State;
+using Apache.Qpid.Framing;
+namespace Apache.Qpid.Client.Handler
+ public class QueuePurgeOkMethodHandler : IStateAwareMethodListener
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(QueuePurgeOkMethodHandler));
+ public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
+ {
+ QueuePurgeOkBody body = (QueuePurgeOkBody)evt.Method;
+ if (body != null)
+ {
+ _logger.InfoFormat("Received Queue.Purge-Ok message, message count {0}", body.MessageCount);
+ }
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs b/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs
new file mode 100644
index 0000000000..e58de2ab96
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs
@@ -0,0 +1,58 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using Apache.Qpid.Framing;
+namespace Apache.Qpid.Client.Message
+ public class AMQMessage
+ {
+ protected IContentHeaderProperties _contentHeaderProperties;
+ /// <summary>
+ /// If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required
+ /// </summary>
+ protected AmqChannel _channel;
+ private long _deliveryTag;
+ public AMQMessage(IContentHeaderProperties properties, long deliveryTag)
+ {
+ _contentHeaderProperties = properties;
+ _deliveryTag = deliveryTag;
+ }
+ public AMQMessage(IContentHeaderProperties properties)
+ : this(properties, -1)
+ {
+ }
+ public long DeliveryTag
+ {
+ get { return _deliveryTag; }
+ }
+ public AmqChannel Channel
+ {
+ get { return _channel; }
+ set { _channel = value; }
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs b/qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs
new file mode 100644
index 0000000000..f352d62c11
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs
@@ -0,0 +1,73 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System.Collections;
+using Apache.Qpid.Framing;
+using log4net;
+using Apache.Qpid.Buffer;
+namespace Apache.Qpid.Client.Message
+ public abstract class AbstractQmsMessageFactory : IMessageFactory
+ {
+ public abstract AbstractQmsMessage CreateMessage(string mimeType);
+ private static readonly ILog _logger = LogManager.GetLogger(typeof (AbstractQmsMessageFactory));
+ protected abstract AbstractQmsMessage CreateMessage(long messageNbr, ByteBuffer data, ContentHeaderBody contentHeader);
+ protected AbstractQmsMessage CreateMessageWithBody(long messageNbr,
+ ContentHeaderBody contentHeader,
+ IList bodies)
+ {
+ ByteBuffer data;
+ // we optimise the non-fragmented case to avoid copying
+ if (bodies != null && bodies.Count == 1)
+ {
+ _logger.Debug("Non-fragmented message body (bodySize=" + contentHeader.BodySize +")");
+ data = ((ContentBody)bodies[0]).Payload;
+ }
+ else
+ {
+ _logger.Debug("Fragmented message body (" + bodies.Count + " frames, bodySize=" + contentHeader.BodySize + ")");
+ data = ByteBuffer.Allocate((int)contentHeader.BodySize); // XXX: Is cast a problem?
+ foreach (ContentBody body in bodies) {
+ data.Put(body.Payload);
+ //body.Payload.Release();
+ }
+ data.Flip();
+ }
+ _logger.Debug("Creating message from buffer with position=" + data.Position + " and remaining=" + data.Remaining);
+ return CreateMessage(messageNbr, data, contentHeader);
+ }
+ public AbstractQmsMessage CreateMessage(long messageNbr, bool redelivered,
+ ContentHeaderBody contentHeader,
+ IList bodies)
+ {
+ AbstractQmsMessage msg = CreateMessageWithBody(messageNbr, contentHeader, bodies);
+ msg.Redelivered = redelivered;
+ return msg;
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs b/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
new file mode 100644
index 0000000000..34b47137e5
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
@@ -0,0 +1,694 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Text;
+using log4net;
+using Apache.Qpid.Framing;
+using Apache.Qpid.Messaging;
+using Apache.Qpid.Buffer;
+namespace Apache.Qpid.Client.Message
+ public abstract class AbstractQmsMessage : AMQMessage, IMessage
+ {
+ private static ILog log = LogManager.GetLogger(typeof(AbstractQmsMessage));
+ protected bool _redelivered;
+ protected ByteBuffer _data;
+ protected bool _readableMessage = false;
+ private QpidHeaders _headers;
+ protected AbstractQmsMessage(ByteBuffer data)
+ : base(new BasicContentHeaderProperties())
+ {
+ Init(data);
+ }
+ protected AbstractQmsMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data)
+ : this(contentHeader, deliveryTag)
+ {
+ Init(data);
+ }
+ protected AbstractQmsMessage(BasicContentHeaderProperties contentHeader, long deliveryTag) : base(contentHeader, deliveryTag)
+ {
+ Init(null);
+ }
+ private void Init(ByteBuffer data)
+ {
+ _data = data;
+ if ( _data != null )
+ {
+ _data.Acquire();
+ }
+ _readableMessage = (data != null);
+ if ( ContentHeaderProperties.Headers == null )
+ ContentHeaderProperties.Headers = new FieldTable();
+ _headers = new QpidHeaders(ContentHeaderProperties.Headers);
+ }
+ //
+ // Properties
+ //
+ /// <summary>
+ /// The application message identifier
+ /// </summary>
+ public string MessageId
+ {
+ get
+ {
+ if (ContentHeaderProperties.MessageId == null)
+ {
+ ContentHeaderProperties.MessageId = "ID:" + DeliveryTag;
+ }
+ return ContentHeaderProperties.MessageId;
+ }
+ set { ContentHeaderProperties.MessageId = value; }
+ }
+ /// <summary>
+ /// The message timestamp
+ /// </summary>
+ public long Timestamp
+ {
+ get
+ {
+ // TODO: look at ulong/long choice
+ return (long) ContentHeaderProperties.Timestamp;
+ }
+ set
+ {
+ ContentHeaderProperties.Timestamp = (ulong) value;
+ }
+ }
+ /// <summary>
+ /// The <see cref="CorrelationId"/> as a byte array.
+ /// </summary>
+ public byte[] CorrelationIdAsBytes
+ {
+ get { return Encoding.Default.GetBytes(ContentHeaderProperties.CorrelationId); }
+ set { ContentHeaderProperties.CorrelationId = Encoding.Default.GetString(value); }
+ }
+ /// <summary>
+ /// The application correlation identifier
+ /// </summary>
+ public string CorrelationId
+ {
+ get { return ContentHeaderProperties.CorrelationId; }
+ set { ContentHeaderProperties.CorrelationId = value; }
+ }
+ struct Dest
+ {
+ public string ExchangeName;
+ public string RoutingKey;
+ public Dest(string exchangeName, string routingKey)
+ {
+ ExchangeName = exchangeName;
+ RoutingKey = routingKey;
+ }
+ }
+ /// <summary>
+ /// Exchange name of the reply-to address
+ /// </summary>
+ public string ReplyToExchangeName
+ {
+ get
+ {
+ return ReadReplyToHeader().ExchangeName;
+ }
+ set
+ {
+ BindingURL dest = ReadReplyToHeader();
+ dest.ExchangeName = value;
+ WriteReplyToHeader(dest);
+ }
+ }
+ /// <summary>
+ /// Routing key of the reply-to address
+ /// </summary>
+ public string ReplyToRoutingKey
+ {
+ get
+ {
+ return ReadReplyToHeader().RoutingKey;
+ }
+ set
+ {
+ BindingURL dest = ReadReplyToHeader();
+ dest.RoutingKey = value;
+ WriteReplyToHeader(dest);
+ }
+ }
+ /// <summary>
+ /// Non-persistent (1) or persistent (2)
+ /// </summary>
+ public DeliveryMode DeliveryMode
+ {
+ get
+ {
+ byte b = ContentHeaderProperties.DeliveryMode;
+ switch (b)
+ {
+ case 1:
+ return DeliveryMode.NonPersistent;
+ case 2:
+ return DeliveryMode.Persistent;
+ default:
+ throw new QpidException("Illegal value for delivery mode in content header properties");
+ }
+ }
+ set
+ {
+ ContentHeaderProperties.DeliveryMode = (byte)(value==DeliveryMode.NonPersistent?1:2);
+ }
+ }
+ /// <summary>
+ /// True, if this is a redelivered message
+ /// </summary>
+ public bool Redelivered
+ {
+ get { return _redelivered; }
+ set { _redelivered = value; }
+ }
+ /// <summary>
+ /// The message type name
+ /// </summary>
+ public string Type
+ {
+ get { return ContentHeaderProperties.Type; }
+ set { ContentHeaderProperties.Type = value; }
+ }
+ /// <summary>
+ /// Message expiration specification
+ /// </summary>
+ public long Expiration
+ {
+ get { return ContentHeaderProperties.Expiration; }
+ set { ContentHeaderProperties.Expiration = value; }
+ }
+ /// <summary>
+ /// The message priority, 0 to 9
+ /// </summary>
+ public byte Priority
+ {
+ get { return ContentHeaderProperties.Priority; }
+ set { ContentHeaderProperties.Priority = (byte) value; }
+ }
+ /// <summary>
+ /// The MIME Content Type
+ /// </summary>
+ public string ContentType
+ {
+ get { return ContentHeaderProperties.ContentType; }
+ set { ContentHeaderProperties.ContentType = value; }
+ }
+ /// <summary>
+ /// The MIME Content Encoding
+ /// </summary>
+ public string ContentEncoding
+ {
+ get { return ContentHeaderProperties.Encoding; }
+ set { ContentHeaderProperties.Encoding = value; }
+ }
+ /// <summary>
+ /// Headers of this message
+ /// </summary>
+ public IHeaders Headers
+ {
+ get { return _headers; }
+ }
+ /// <summary>
+ /// The creating user id
+ /// </summary>
+ public string UserId
+ {
+ get { return ContentHeaderProperties.UserId; }
+ set { ContentHeaderProperties.UserId = value; }
+ }
+ /// <summary>
+ /// The creating application id
+ /// </summary>
+ public string AppId
+ {
+ get { return ContentHeaderProperties.AppId; }
+ set { ContentHeaderProperties.AppId = value; }
+ }
+ /// <summary>
+ /// Intra-cluster routing identifier
+ /// </summary>
+ public string ClusterId
+ {
+ get { return ContentHeaderProperties.ClusterId; }
+ set { ContentHeaderProperties.ClusterId = value; }
+ }
+ /// <summary>
+ /// Return the raw byte array that is used to populate the frame when sending
+ /// the message.
+ /// </summary>
+ /// <value>a byte array of message data</value>
+ public ByteBuffer Data
+ {
+ get
+ {
+ if (_data != null)
+ {
+ if (!_readableMessage)
+ {
+ _data.Flip();
+ }
+ else
+ {
+ // Make sure we rewind the data just in case any method has moved the
+ // position beyond the start.
+ _data.Rewind();
+ }
+ }
+ return _data;
+ }
+ set
+ {
+ _data = value;
+ }
+ }
+ public void Acknowledge()
+ {
+ // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
+ // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
+ if (_channel != null)
+ {
+ // we set multiple to true here since acknowledgement implies acknowledge of all count messages
+ // received on the session
+ _channel.AcknowledgeMessage((ulong)DeliveryTag, true);
+ }
+ }
+ public abstract void ClearBodyImpl();
+ public void ClearBody()
+ {
+ ClearBodyImpl();
+ _readableMessage = false;
+ }
+ /// <summary>
+ /// Get a String representation of the body of the message. Used in the
+ /// toString() method which outputs this before message properties.
+ /// </summary>
+ /// <exception cref="QpidException"></exception>
+ public abstract string ToBodyString();
+ public override string ToString()
+ {
+ try
+ {
+ StringBuilder buf = new StringBuilder("Body:\n");
+ buf.Append(ToBodyString());
+ buf.Append("\nQmsTimestamp: ").Append(Timestamp);
+ buf.Append("\nQmsExpiration: ").Append(Expiration);
+ buf.Append("\nQmsPriority: ").Append(Priority);
+ buf.Append("\nQmsDeliveryMode: ").Append(DeliveryMode);
+ buf.Append("\nReplyToExchangeName: ").Append(ReplyToExchangeName);
+ buf.Append("\nReplyToRoutingKey: ").Append(ReplyToRoutingKey);
+ buf.Append("\nAMQ message number: ").Append(DeliveryTag);
+ buf.Append("\nProperties:");
+ if (ContentHeaderProperties.Headers == null)
+ {
+ buf.Append("<NONE>");
+ }
+ else
+ {
+ buf.Append(Headers.ToString());
+ }
+ return buf.ToString();
+ }
+ catch (Exception e)
+ {
+ return e.ToString();
+ }
+ }
+ public FieldTable PopulateHeadersFromMessageProperties()
+ {
+ if (ContentHeaderProperties.Headers == null)
+ {
+ return null;
+ }
+ else
+ {
+ //
+ // We need to convert every property into a String representation
+ // Note that type information is preserved in the property name
+ //
+ FieldTable table = new FieldTable();
+ foreach (DictionaryEntry entry in ContentHeaderProperties.Headers)
+ {
+ string propertyName = (string) entry.Key;
+ if (propertyName == null)
+ {
+ continue;
+ }
+ else
+ {
+ table[propertyName] = entry.Value.ToString();
+ }
+ }
+ return table;
+ }
+ }
+ public BasicContentHeaderProperties ContentHeaderProperties
+ {
+ get
+ {
+ return (BasicContentHeaderProperties) _contentHeaderProperties;
+ }
+ }
+ protected virtual void Reset()
+ {
+ _readableMessage = true;
+ }
+ public bool IsReadable
+ {
+ get { return _readableMessage; }
+ }
+ public bool isWritable
+ {
+ get { return !_readableMessage; }
+ }
+ protected void CheckReadable()
+ {
+ if ( !_readableMessage )
+ {
+ throw new MessageNotReadableException("You need to call reset() to make the message readable");
+ }
+ }
+ /// <summary>
+ /// Decodes the replyto field if one is set.
+ ///
+ /// Splits a replyto field containing an exchange name followed by a ':', followed by a routing key into the exchange name and
+ /// routing key seperately. The exchange name may be empty in which case the empty string is returned. If the exchange name is
+ /// empty the replyto field is expected to being with ':'.
+ ///
+ /// Anyhting other than a two part replyto field sperated with a ':' will result in an exception.
+ /// </summary>
+ ///
+ /// <returns>A destination initialized to the replyto location if a replyto field was set, or an empty destination otherwise.</returns>
+ private BindingURL ReadReplyToHeader()
+ {
+ string replyToEncoding = ContentHeaderProperties.ReplyTo;
+ //log.Debug("replyToEncoding = " + replyToEncoding);
+ BindingURL bindingUrl = new BindingURL(replyToEncoding);
+ //log.Debug("bindingUrl = " + bindingUrl.ToString());
+ return bindingUrl;
+ //log.Info("replyToEncoding = " + replyToEncoding);
+// if ( replyToEncoding == null )
+// {
+// return new Dest();
+// } else
+// {
+// // Split the replyto field on a ':'
+// string[] split = replyToEncoding.Split(':');
+// // Ensure that the replyto field argument only consisted of two parts.
+// if ( split.Length != 2 )
+// {
+// throw new QpidException("Illegal value in ReplyTo property: " + replyToEncoding);
+// }
+// // Extract the exchange name and routing key from the split replyto field.
+// string exchangeName = split[0];
+// string[] split2 = split[1].Split('/');
+// string routingKey = split2[3];
+// return new Dest(exchangeName, routingKey);
+// }
+ }
+ private void WriteReplyToHeader(BindingURL dest)
+ {
+ string encodedDestination = string.Format("{0}:{1}", dest.ExchangeName, dest.RoutingKey);
+ ContentHeaderProperties.ReplyTo = encodedDestination;
+ }
+ }
+ public class BindingURL
+ {
+ public readonly static string OPTION_EXCLUSIVE = "exclusive";
+ public readonly static string OPTION_AUTODELETE = "autodelete";
+ public readonly static string OPTION_DURABLE = "durable";
+ public readonly static string OPTION_CLIENTID = "clientid";
+ public readonly static string OPTION_SUBSCRIPTION = "subscription";
+ public readonly static string OPTION_ROUTING_KEY = "routingkey";
+ /// <summary> Holds the undecoded URL </summary>
+ string url;
+ /// <summary> Holds the decoded options. </summary>
+ IDictionary options = new Hashtable();
+ /// <summary> Holds the decoded exchange class. </summary>
+ string exchangeClass;
+ /// <summary> Holds the decoded exchange name. </summary>
+ string exchangeName;
+ /// <summary> Holds the destination name. </summary>
+ string destination;
+ /// <summary> Holds the decoded queue name. </summary>
+ string queueName;
+ /// <summary>
+ /// The binding URL has the format:
+ /// <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
+ /// </summary>
+ public BindingURL(string url)
+ {
+ this.url = url;
+ Parse();
+ }
+ public string Url { get { return url; } }
+ public string ExchangeClass
+ {
+ get { return exchangeClass; }
+ set { exchangeClass = value; }
+ }
+ public string ExchangeName
+ {
+ get { return exchangeName; }
+ set { exchangeName = value; }
+ }
+ public string QueueName
+ {
+ get { return queueName; }
+ set { queueName = value; }
+ }
+ public string DestinationName
+ {
+ get { return destination; }
+ set { destination = value; }
+ }
+ public string RoutingKey {
+ get { return (string)options[OPTION_ROUTING_KEY]; }
+ set { options[OPTION_ROUTING_KEY] = value; }
+ }
+ public bool ContainsOption(string key) { return options.Contains(key); }
+ public string ToString()
+ {
+ return "BindingURL: [ ExchangeClass = " + ExchangeClass + ", ExchangeName = " + ExchangeName + ", QueueName = " + QueueName +
+ ", DestinationName = " + DestinationName + ", RoutingKey = " + RoutingKey + " ] ";
+ }
+ private void Parse()
+ {
+ Uri binding = new Uri(url);
+ // Extract the URI scheme, this contains the exchange class. It is defaulted to the direct exchange if not specified.
+ string exchangeClass = binding.Scheme;
+ if (exchangeClass == null)
+ {
+ url = ExchangeNameDefaults.DIRECT_EXCHANGE_CLASS + "://" + ExchangeNameDefaults.DIRECT + "//" + url;
+ Parse();
+ return;
+ }
+ else
+ {
+ this.exchangeClass = exchangeClass;
+ }
+ // Extract the host name, this contains the exchange name. It is defaulted to the default direct exchange if not specified.
+ string exchangeName = binding.Host;
+ if (exchangeName == null)
+ {
+ if (exchangeClass.Equals(ExchangeNameDefaults.DIRECT_EXCHANGE_CLASS))
+ {
+ this.exchangeName = "";
+ }
+ }
+ else
+ {
+ this.exchangeName = exchangeName;
+ }
+ // Extract the destination and queue name.
+ if ((binding.AbsolutePath == null) || binding.AbsolutePath.Equals(""))
+ {
+ throw new UriFormatException("Destination or Queue required");
+ }
+ else
+ {
+ int slashOffset = binding.AbsolutePath.IndexOf("/", 1);
+ if (slashOffset == -1)
+ {
+ throw new UriFormatException("Destination required");
+ }
+ else
+ {
+ String path = binding.AbsolutePath;
+ this.destination = path.Substring(1, slashOffset - 1);
+ this.queueName = path.Substring(slashOffset + 1);
+ }
+ }
+ ParseOptions(options, binding.Query);
+ // If the routing key is not set as an option, set it to the destination name.
+ if (!ContainsOption(OPTION_ROUTING_KEY))
+ {
+ options[OPTION_ROUTING_KEY] = destination;
+ }
+ }
+ /// <summary>
+ /// options looks like this
+ /// brokerlist='tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'',failover='method?option='value',option='value'
+ /// </summary>
+ public static void ParseOptions(IDictionary optionMap, string options)
+ {
+ // Check that there really are some options to parse.
+ if ((options == null) || (options.IndexOf('=') == -1))
+ {
+ return;
+ }
+ int optionIndex = options.IndexOf('=');
+ string option = options.Substring(0, optionIndex);
+ int length = options.Length;
+ int nestedQuotes = 0;
+ // Holds the index of the final "'".
+ int valueIndex = optionIndex;
+ // Loop over all the options.Dest
+ while ((nestedQuotes > 0) || (valueIndex < length))
+ {
+ valueIndex++;
+ if (valueIndex >= length)
+ {
+ break;
+ }
+ if (options[valueIndex] == '\'')
+ {
+ if ((valueIndex + 1) < options.Length)
+ {
+ if ((options[valueIndex + 1] == '&') ||
+ (options[valueIndex + 1] == ',') ||
+ (options[valueIndex + 1] == ';') ||
+ (options[valueIndex + 1] == '\''))
+ {
+ nestedQuotes--;
+ if (nestedQuotes == 0)
+ {
+ // We've found the value of an option
+ break;
+ }
+ }
+ else
+ {
+ nestedQuotes++;
+ }
+ }
+ else
+ {
+ // We are at the end of the string
+ // Check to see if we are corectly closing quotes
+ if (options[valueIndex] == '\'')
+ {
+ nestedQuotes--;
+ }
+ break;
+ }
+ }
+ }
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs b/qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs
new file mode 100644
index 0000000000..bed379290f
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs
@@ -0,0 +1,52 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System.Collections;
+using Apache.Qpid.Framing;
+namespace Apache.Qpid.Client.Message
+ public interface IMessageFactory
+ {
+ /// <summary>
+ /// Create a message
+ /// </summary>
+ /// <param name="deliverTag">Delivery Tag</param>
+ /// <param name="messageNbr">Message Sequence Number</param>
+ /// <param name="redelivered">True if this is a redelivered message</param>
+ /// <param name="contentHeader">Content headers</param>
+ /// <param name="bodies">Message bodies</param>
+ /// <returns>The new message</returns>
+ /// <exception cref="QpidMessagingException">if the message cannot be created</exception>
+ AbstractQmsMessage CreateMessage(long deliverTag, bool redelivered,
+ ContentHeaderBody contentHeader,
+ IList bodies);
+ /// <summary>
+ /// Creates the message.
+ /// </summary>
+ /// <param name="mimeType">Mime type to associate the new message with</param>
+ /// <returns>The new message</returns>
+ /// <exception cref="QpidMessagingException">if the message cannot be created</exception>
+ AbstractQmsMessage CreateMessage(string mimeType);
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs b/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs
new file mode 100644
index 0000000000..fdb5e14aa6
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs
@@ -0,0 +1,129 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using Apache.Qpid.Framing;
+using Apache.Qpid.Messaging;
+namespace Apache.Qpid.Client.Message
+ public class MessageFactoryRegistry
+ {
+ private readonly Hashtable _mimeToFactoryMap = new Hashtable();
+ private IMessageFactory _defaultFactory;
+ /// <summary>
+ /// Default factory to use for unknown message types
+ /// </summary>
+ public IMessageFactory DefaultFactory
+ {
+ get { return _defaultFactory; }
+ set { _defaultFactory = value; }
+ }
+ /// <summary>
+ /// Register a new message factory for a MIME type
+ /// </summary>
+ /// <param name="mimeType">Mime type to register</param>
+ /// <param name="mf"></param>
+ public void RegisterFactory(string mimeType, IMessageFactory mf)
+ {
+ if ( mf == null )
+ throw new ArgumentNullException("mf");
+ if ( mimeType == null || mimeType.Length == 0 )
+ throw new ArgumentNullException("mimeType");
+ _mimeToFactoryMap[mimeType] = mf;
+ }
+ /// <summary>
+ /// Remove a message factory
+ /// </summary>
+ /// <param name="mimeType">MIME type to unregister</param>
+ public void DeregisterFactory(string mimeType)
+ {
+ _mimeToFactoryMap.Remove(mimeType);
+ }
+ /// <summary>
+ /// Create a message. This looks up the MIME type from the content header and instantiates the appropriate
+ /// concrete message type.
+ /// </summary>
+ /// <param name="messageNbr">the AMQ message id</param>
+ /// <param name="redelivered">true if redelivered</param>
+ /// <param name="contentHeader">the content header that was received</param>
+ /// <param name="bodies">a list of ContentBody instances</param>
+ /// <returns>the message.</returns>
+ /// <exception cref="AMQException"/>
+ /// <exception cref="QpidException"/>
+ public AbstractQmsMessage CreateMessage(long messageNbr, bool redelivered,
+ ContentHeaderBody contentHeader,
+ IList bodies)
+ {
+ BasicContentHeaderProperties properties = (BasicContentHeaderProperties)contentHeader.Properties;
+ if ( properties.ContentType == null )
+ {
+ properties.ContentType = "";
+ }
+ IMessageFactory mf = GetFactory(properties.ContentType);
+ return mf.CreateMessage(messageNbr, redelivered, contentHeader, bodies);
+ }
+ /// <summary>
+ /// Create a new message of the specified type
+ /// </summary>
+ /// <param name="mimeType">The Mime type</param>
+ /// <returns>The new message</returns>
+ public AbstractQmsMessage CreateMessage(string mimeType)
+ {
+ if ( mimeType == null || mimeType.Length == 0 )
+ throw new ArgumentNullException("mimeType");
+ IMessageFactory mf = GetFactory(mimeType);
+ return mf.CreateMessage(mimeType);
+ }
+ /// <summary>
+ /// Construct a new registry with the default message factories registered
+ /// </summary>
+ /// <returns>a message factory registry</returns>
+ public static MessageFactoryRegistry NewDefaultRegistry()
+ {
+ MessageFactoryRegistry mf = new MessageFactoryRegistry();
+ mf.RegisterFactory("text/plain", new QpidTextMessageFactory());
+ mf.RegisterFactory("text/xml", new QpidTextMessageFactory());
+ mf.RegisterFactory("application/octet-stream", new QpidBytesMessageFactory());
+ mf.DefaultFactory = new QpidBytesMessageFactory();
+ return mf;
+ }
+ private IMessageFactory GetFactory(string mimeType)
+ {
+ IMessageFactory mf = (IMessageFactory)_mimeToFactoryMap[mimeType];
+ return mf != null ? mf : _defaultFactory;
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs b/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs
new file mode 100644
index 0000000000..fb3efb1b0f
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs
@@ -0,0 +1,353 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.IO;
+using System.Runtime.Serialization;
+using System.Text;
+using Apache.Qpid.Framing;
+using Apache.Qpid.Messaging;
+using Apache.Qpid.Buffer;
+namespace Apache.Qpid.Client.Message
+ [Serializable]
+ class MessageEOFException : QpidException
+ {
+ public MessageEOFException(string message) : base(message)
+ {
+ }
+ protected MessageEOFException(SerializationInfo info, StreamingContext ctxt)
+ : base(info, ctxt)
+ {
+ }
+ }
+ public class QpidBytesMessage : AbstractQmsMessage, IBytesMessage
+ {
+ private const int DEFAULT_BUFFER_INITIAL_SIZE = 1024;
+ public QpidBytesMessage() : this(null)
+ {
+ }
+ /// <summary>
+ /// Construct a bytes message with existing data.
+ /// </summary>
+ /// <param name="data">if data is not null, the message is immediately in read only mode. if data is null, it is in
+ /// write-only mode</param>
+ QpidBytesMessage(ByteBuffer data) : base(data)
+ {
+ // superclass constructor has instantiated a content header at this point
+ if (data == null)
+ {
+ _data = ByteBuffer.Allocate(DEFAULT_BUFFER_INITIAL_SIZE);
+ _data.IsAutoExpand = true;
+ }
+ }
+ internal QpidBytesMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data)
+ // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea
+ : base(messageNbr, (BasicContentHeaderProperties)contentHeader.Properties, data)
+ {
+ }
+ public override void ClearBodyImpl()
+ {
+ _data.Clear();
+ }
+ public override string ToBodyString()
+ {
+ CheckReadable();
+ try
+ {
+ return GetText();
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString());
+ }
+ }
+ private String GetText()
+ {
+ // this will use the default platform encoding
+ if (_data == null)
+ {
+ return null;
+ }
+ int pos = _data.Position;
+ _data.Rewind();
+ // one byte left is for the end of frame marker
+ if (_data.Remaining == 0)
+ {
+ // this is really redundant since pos must be zero
+ _data.Position = pos;
+ return null;
+ }
+ else
+ {
+ byte[] data = new byte[_data.Remaining];
+ _data.GetBytes(data);
+ return Encoding.UTF8.GetString(data);
+ }
+ }
+ public long BodyLength
+ {
+ get
+ {
+ CheckReadable();
+ return _data.Limit;
+ }
+ }
+ private void CheckWritable()
+ {
+ if (_readableMessage)
+ {
+ throw new MessageNotWriteableException("You need to call clearBody() to make the message writable");
+ }
+ }
+ public bool ReadBoolean()
+ {
+ CheckReadable();
+ CheckAvailable(1);
+ return _data.GetByte() != 0;
+ }
+ public byte ReadByte()
+ {
+ CheckReadable();
+ CheckAvailable(1);
+ return _data.GetByte();
+ }
+ public short ReadSignedByte()
+ {
+ CheckReadable();
+ CheckAvailable(1);
+ return _data.GetSByte();
+ }
+ public short ReadShort()
+ {
+ CheckReadable();
+ CheckAvailable(2);
+ return _data.GetInt16();
+ }
+ public char ReadChar()
+ {
+ CheckReadable();
+ CheckAvailable(2);
+ return _data.GetChar();
+ }
+ public int ReadInt()
+ {
+ CheckReadable();
+ CheckAvailable(4);
+ return _data.GetInt32();
+ }
+ public long ReadLong()
+ {
+ CheckReadable();
+ CheckAvailable(8);
+ return _data.GetInt64();
+ }
+ public float ReadFloat()
+ {
+ CheckReadable();
+ CheckAvailable(4);
+ return _data.GetFloat();
+ }
+ public double ReadDouble()
+ {
+ CheckReadable();
+ CheckAvailable(8);
+ return _data.GetDouble();
+ }
+ public string ReadUTF()
+ {
+ CheckReadable();
+ // we check only for one byte since theoretically the string could be only a
+ // single byte when using UTF-8 encoding
+ CheckAvailable(1);
+ try
+ {
+ byte[] data = new byte[_data.Remaining];
+ _data.GetBytes(data);
+ return Encoding.UTF8.GetString(data);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+ public int ReadBytes(byte[] bytes)
+ {
+ if (bytes == null)
+ {
+ throw new ArgumentNullException("bytes");
+ }
+ CheckReadable();
+ int count = (_data.Remaining >= bytes.Length ? bytes.Length : _data.Remaining);
+ if (count == 0)
+ {
+ return -1;
+ }
+ else
+ {
+ _data.GetBytes(bytes, 0, count);
+ return count;
+ }
+ }
+ public int ReadBytes(byte[] bytes, int maxLength)
+ {
+ if (bytes == null)
+ {
+ throw new ArgumentNullException("bytes");
+ }
+ if (maxLength > bytes.Length)
+ {
+ throw new ArgumentOutOfRangeException("maxLength must be >= 0");
+ }
+ CheckReadable();
+ int count = (_data.Remaining >= maxLength ? maxLength : _data.Remaining);
+ if (count == 0)
+ {
+ return -1;
+ }
+ else
+ {
+ _data.GetBytes(bytes, 0, count);
+ return count;
+ }
+ }
+ public void WriteBoolean(bool b)
+ {
+ CheckWritable();
+ _data.Put(b ? (byte)1 : (byte)0);
+ }
+ public void WriteByte(byte b)
+ {
+ CheckWritable();
+ _data.Put(b);
+ }
+ public void WriteShort(short i)
+ {
+ CheckWritable();
+ _data.Put(i);
+ }
+ public void WriteChar(char c)
+ {
+ CheckWritable();
+ _data.Put(c);
+ }
+ public void WriteSignedByte(short value)
+ {
+ CheckWritable();
+ _data.Put(value);
+ }
+ public void WriteDouble(double value)
+ {
+ CheckWritable();
+ _data.Put(value);
+ }
+ public void WriteFloat(float value)
+ {
+ CheckWritable();
+ _data.Put(value);
+ }
+ public void WriteInt(int value)
+ {
+ CheckWritable();
+ _data.Put(value);
+ }
+ public void WriteLong(long value)
+ {
+ CheckWritable();
+ _data.Put(value);
+ }
+ public void WriteUTF(string value)
+ {
+ CheckWritable();
+ byte[] encodedData = Encoding.UTF8.GetBytes(value);
+ _data.Put(encodedData);
+ }
+ public void WriteBytes(byte[] bytes)
+ {
+ CheckWritable();
+ _data.Put(bytes);
+ }
+ public void WriteBytes(byte[] bytes, int offset, int length)
+ {
+ CheckWritable();
+ _data.Put(bytes, offset, length);
+ }
+ protected override void Reset()
+ {
+ base.Reset();
+ _data.Flip();
+ }
+ void IBytesMessage.Reset()
+ {
+ Reset();
+ }
+ /**
+ * Check that there is at least a certain number of bytes available to read
+ *
+ * @param len the number of bytes
+ * @throws MessageEOFException if there are less than len bytes available to read
+ */
+ private void CheckAvailable(int len)
+ {
+ if (_data.Remaining < len)
+ {
+ throw new MessageEOFException("Unable to read " + len + " bytes");
+ }
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs b/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs
new file mode 100644
index 0000000000..3cc96cbddc
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs
@@ -0,0 +1,75 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using Apache.Qpid.Framing;
+using Apache.Qpid.Buffer;
+namespace Apache.Qpid.Client.Message
+ public class QpidBytesMessageFactory : AbstractQmsMessageFactory
+ {
+ //protected override AbstractQmsMessage CreateMessageWithBody(long messageNbr,
+ // ContentHeaderBody contentHeader,
+ // IList bodies)
+ //{
+ // byte[] data;
+ // // we optimise the non-fragmented case to avoid copying
+ // if (bodies != null && bodies.Count == 1)
+ // {
+ // data = ((ContentBody)bodies[0]).Payload;
+ // }
+ // else
+ // {
+ // data = new byte[(long)contentHeader.BodySize];
+ // int currentPosition = 0;
+ // foreach (ContentBody cb in bodies)
+ // {
+ // Array.Copy(cb.Payload, 0, data, currentPosition, cb.Payload.Length);
+ // currentPosition += cb.Payload.Length;
+ // }
+ // }
+ // return new QpidBytesMessage(messageNbr, data, contentHeader);
+ //}
+ //public override AbstractQmsMessage CreateMessage()
+ //{
+ // return new QpidBytesMessage();
+ //}
+ protected override AbstractQmsMessage CreateMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader)
+ {
+ return new QpidBytesMessage(deliveryTag, contentHeader, data);
+ }
+ public override AbstractQmsMessage CreateMessage(string mimeType)
+ {
+ QpidBytesMessage msg = new QpidBytesMessage();
+ msg.ContentType = mimeType;
+ return msg;
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Message/QpidHeaders.cs b/qpid/dotnet/Qpid.Client/Client/Message/QpidHeaders.cs
new file mode 100644
index 0000000000..9ad1c26867
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Message/QpidHeaders.cs
@@ -0,0 +1,233 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Text;
+using Apache.Qpid.Framing;
+using Apache.Qpid.Messaging;
+namespace Apache.Qpid.Client.Message
+ internal class QpidHeaders : IHeaders
+ {
+ private FieldTable _headers;
+ public QpidHeaders(FieldTable headers)
+ {
+ if ( headers == null )
+ throw new ArgumentNullException("headers");
+ _headers = headers;
+ }
+ public bool Contains(string name)
+ {
+ CheckPropertyName(name);
+ return _headers.Contains(name);
+ }
+ public void Clear()
+ {
+ _headers.Clear();
+ }
+ public object this[string name]
+ {
+ get { return GetObject(name); }
+ set { SetObject(name, value); }
+ }
+ public bool GetBoolean(string name)
+ {
+ CheckPropertyName(name);
+ if ( Contains(name) )
+ return _headers.GetBoolean(name);
+ return false;
+ }
+ public void SetBoolean(string name, bool b)
+ {
+ CheckPropertyName(name);
+ _headers.SetBoolean(name, b);
+ }
+ public byte GetByte(string propertyName)
+ {
+ CheckPropertyName(propertyName);
+ if ( Contains(propertyName) )
+ return _headers.GetByte(propertyName);
+ return 0;
+ }
+ public void SetByte(string propertyName, byte b)
+ {
+ CheckPropertyName(propertyName);
+ _headers.SetByte(propertyName, b);
+ }
+ // we have sbyte overloads to interoperate with java
+ // because the Java client/server uses signed bytes
+ // by default, while C#'s is unsigned
+ public sbyte GetSByte(string propertyName)
+ {
+ CheckPropertyName(propertyName);
+ if ( Contains(propertyName) )
+ return _headers.GetSByte(propertyName);
+ return 0;
+ }
+ public void SetSByte(string propertyName, sbyte b)
+ {
+ CheckPropertyName(propertyName);
+ _headers.SetSByte(propertyName, b);
+ }
+ public short GetShort(string propertyName)
+ {
+ CheckPropertyName(propertyName);
+ if ( Contains(propertyName) )
+ return _headers.GetInt16(propertyName);
+ return 0;
+ }
+ public void SetShort(string propertyName, short i)
+ {
+ CheckPropertyName(propertyName);
+ _headers.SetInt16(propertyName, i);
+ }
+ public int GetInt(string propertyName)
+ {
+ CheckPropertyName(propertyName);
+ if ( Contains(propertyName) )
+ return _headers.GetInt32(propertyName);
+ return 0;
+ }
+ public void SetInt(string propertyName, int i)
+ {
+ CheckPropertyName(propertyName);
+ _headers.SetInt32(propertyName, i);
+ }
+ public long GetLong(string propertyName)
+ {
+ CheckPropertyName(propertyName);
+ if ( Contains(propertyName) )
+ return _headers.GetInt64(propertyName);
+ return 0;
+ }
+ public void SetLong(string propertyName, long l)
+ {
+ CheckPropertyName(propertyName);
+ _headers.SetInt64(propertyName, l);
+ }
+ public float GetFloat(String propertyName)
+ {
+ CheckPropertyName(propertyName);
+ if ( Contains(propertyName) )
+ return _headers.GetFloat(propertyName);
+ return 0f;
+ }
+ public void SetFloat(string propertyName, float f)
+ {
+ CheckPropertyName(propertyName);
+ _headers.SetFloat(propertyName, f);
+ }
+ public double GetDouble(string propertyName)
+ {
+ CheckPropertyName(propertyName);
+ if ( Contains(propertyName) )
+ return _headers.GetDouble(propertyName);
+ return 0;
+ }
+ public void SetDouble(string propertyName, double v)
+ {
+ CheckPropertyName(propertyName);
+ _headers.SetDouble(propertyName, v);
+ }
+ public string GetString(string propertyName)
+ {
+ CheckPropertyName(propertyName);
+ return _headers.GetString(propertyName);
+ }
+ public void SetString(string propertyName, string value)
+ {
+ CheckPropertyName(propertyName);
+ _headers.SetString(propertyName, value);
+ }
+ public object GetObject(string propertyName)
+ {
+ CheckPropertyName(propertyName);
+ return _headers[propertyName];
+ }
+ public void SetObject(string propertyName, object value)
+ {
+ CheckPropertyName(propertyName);
+ _headers[propertyName] = value;
+ }
+ private static void CheckPropertyName(string propertyName)
+ {
+ if ( propertyName == null )
+ {
+ throw new ArgumentException("Property name must not be null");
+ } else if ( "".Equals(propertyName) )
+ {
+ throw new ArgumentException("Property name must not be the empty string");
+ }
+ }
+ public override string ToString()
+ {
+ StringBuilder buf = new StringBuilder("{");
+ int i = 0;
+ foreach ( DictionaryEntry entry in _headers )
+ {
+ ++i;
+ if ( i > 1 )
+ {
+ buf.Append(", ");
+ }
+ string propertyName = (string)entry.Key;
+ if ( propertyName == null )
+ {
+ buf.Append("\nInternal error: Property with NULL key defined");
+ } else
+ {
+ buf.Append(propertyName);
+ buf.Append(" = ").Append(entry.Value);
+ }
+ }
+ buf.Append("}");
+ return buf.ToString();
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs b/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs
new file mode 100644
index 0000000000..24aef92aa5
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs
@@ -0,0 +1,115 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Text;
+using Apache.Qpid.Framing;
+using Apache.Qpid.Messaging;
+using Apache.Qpid.Buffer;
+namespace Apache.Qpid.Client.Message
+ public class QpidTextMessage : AbstractQmsMessage, ITextMessage
+ {
+ private string _decodedValue = null;
+ private static Encoding DEFAULT_ENCODING = Encoding.UTF8;
+ internal QpidTextMessage() : this(null, null)
+ {
+ ContentEncoding = DEFAULT_ENCODING.BodyName;
+ }
+ internal QpidTextMessage(ByteBuffer data, String encoding) : base(data)
+ {
+ ContentEncoding = encoding;
+ }
+ internal QpidTextMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data)
+ :base(deliveryTag, contentHeader, data)
+ {
+ }
+ public override void ClearBodyImpl()
+ {
+ if (_data != null)
+ {
+ _data.Release();
+ }
+ _data = null;
+ _decodedValue = null;
+ }
+ public override string ToBodyString()
+ {
+ return Text;
+ }
+ public string Text
+ {
+ get
+ {
+ if (_data == null && _decodedValue == null)
+ {
+ return null;
+ }
+ else if (_decodedValue != null)
+ {
+ return _decodedValue;
+ }
+ else
+ {
+ _data.Rewind();
+ // Read remaining bytes.
+ byte[] bytes = new byte[_data.Remaining];
+ _data.GetBytes(bytes);
+ // Convert to string based on encoding.
+ if (ContentHeaderProperties.Encoding != null)
+ {
+ // throw ArgumentException if the encoding is not supported
+ _decodedValue = Encoding.GetEncoding(ContentHeaderProperties.Encoding).GetString(bytes);
+ }
+ else
+ {
+ _decodedValue = DEFAULT_ENCODING.GetString(bytes);
+ }
+ return _decodedValue;
+ }
+ }
+ set
+ {
+ byte[] bytes;
+ if (ContentHeaderProperties.Encoding == null)
+ {
+ bytes = Encoding.Default.GetBytes(value);
+ }
+ else
+ {
+ // throw ArgumentException if the encoding is not supported
+ bytes = Encoding.GetEncoding(ContentHeaderProperties.Encoding).GetBytes(value);
+ }
+ _data = ByteBuffer.Wrap(bytes);
+ _decodedValue = value;
+ }
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs b/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs
new file mode 100644
index 0000000000..79871e85ca
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs
@@ -0,0 +1,40 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using Apache.Qpid.Buffer;
+using Apache.Qpid.Framing;
+namespace Apache.Qpid.Client.Message
+ public class QpidTextMessageFactory : AbstractQmsMessageFactory
+ {
+ public override AbstractQmsMessage CreateMessage(string mimeType)
+ {
+ QpidTextMessage msg = new QpidTextMessage();
+ msg.ContentType = mimeType;
+ return msg;
+ }
+ protected override AbstractQmsMessage CreateMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader)
+ {
+ return new QpidTextMessage(deliveryTag, (BasicContentHeaderProperties) contentHeader.Properties, data);
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Message/UnexpectedBodyReceivedException.cs b/qpid/dotnet/Qpid.Client/Client/Message/UnexpectedBodyReceivedException.cs
new file mode 100644
index 0000000000..4317ef3474
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Message/UnexpectedBodyReceivedException.cs
@@ -0,0 +1,57 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Runtime.Serialization;
+using log4net;
+namespace Apache.Qpid.Client.Message
+ /// <summary>
+ /// Raised when a message body is received unexpectedly by the client. This typically occurs when the
+ /// length of bodies received does not match with the declared length in the content header.
+ /// </summary>
+ [Serializable]
+ public class UnexpectedBodyReceivedException : AMQException
+ {
+ public UnexpectedBodyReceivedException(ILog logger, string msg, Exception t)
+ : base(logger, msg, t)
+ {
+ }
+ public UnexpectedBodyReceivedException(ILog logger, string msg)
+ : base(logger, msg)
+ {
+ }
+ public UnexpectedBodyReceivedException(ILog logger, int errorCode, string msg)
+ : base(logger, errorCode, msg)
+ {
+ }
+ protected UnexpectedBodyReceivedException(SerializationInfo info, StreamingContext ctxt)
+ : base(info, ctxt)
+ {
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs b/qpid/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs
new file mode 100644
index 0000000000..d329712334
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs
@@ -0,0 +1,57 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System.Collections;
+using Apache.Qpid.Framing;
+namespace Apache.Qpid.Client.Message
+ public class UnprocessedMessage
+ {
+ private ulong _bytesReceived = 0;
+ public BasicDeliverBody DeliverBody;
+ public BasicReturnBody BounceBody;
+ public ushort ChannelId;
+ public ContentHeaderBody ContentHeader;
+ /// <summary>
+ /// List of ContentBody instances. Due to fragmentation you don't know how big this will be in general
+ /// </summary>
+ /// TODO: write and use linked list class
+ public IList Bodies = new ArrayList();
+ public void ReceiveBody(ContentBody body)
+ {
+ Bodies.Add(body);
+ if (body.Payload != null)
+ {
+ _bytesReceived += (uint)body.Payload.Remaining;
+ }
+ }
+ public bool IsAllBodyDataReceived()
+ {
+ return _bytesReceived == ContentHeader.BodySize;
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/AMQMethodEvent.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/AMQMethodEvent.cs
new file mode 100644
index 0000000000..a7ce808862
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Protocol/AMQMethodEvent.cs
@@ -0,0 +1,76 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Text;
+using Apache.Qpid.Framing;
+namespace Apache.Qpid.Client.Protocol
+ public class AMQMethodEvent
+ {
+ private AMQMethodBody _method;
+ private ushort _channelId;
+ private AMQProtocolSession _protocolSession;
+ public AMQMethodEvent(ushort channelId, AMQMethodBody method, AMQProtocolSession protocolSession)
+ {
+ _channelId = channelId;
+ _method = method;
+ _protocolSession = protocolSession;
+ }
+ public AMQMethodBody Method
+ {
+ get
+ {
+ return _method;
+ }
+ }
+ public ushort ChannelId
+ {
+ get
+ {
+ return _channelId;
+ }
+ }
+ public AMQProtocolSession ProtocolSession
+ {
+ get
+ {
+ return _protocolSession;
+ }
+ }
+ public override String ToString()
+ {
+ StringBuilder buf = new StringBuilder("Method event: ");
+ buf.Append("\nChannel id: ").Append(_channelId);
+ buf.Append("\nMethod: ").Append(_method);
+ return buf.ToString();
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs
new file mode 100644
index 0000000000..c51538b70e
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs
@@ -0,0 +1,318 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Threading;
+using log4net;
+using Apache.Qpid.Client.Failover;
+using Apache.Qpid.Client.Protocol.Listener;
+using Apache.Qpid.Client.State;
+using Apache.Qpid.Framing;
+namespace Apache.Qpid.Client.Protocol
+ /// <summary>
+ /// AMQProtocolListener
+ ///
+ /// <p/>Fail-over state transition rules...
+ ///
+ /// <p/>The failover handler is created when the session is created since it needs a reference to the IoSession in order
+ /// to be able to send errors during failover back to the client application. The session won't be available in the case
+ /// when failing over due to a Connection.Redirect message from the broker.
+ ///
+ /// <p><table id="crc"><caption>CRC Card</caption>
+ /// <tr><th> Responsibilities <th> Collaborations
+ /// <tr><td> Track fail over state of a connection.
+ /// <tr><td> Manage method listeners. <td> IAMQMethodListener
+ /// <tr><td> Receive notification of all IO errors on a connection. <td> IoHandler
+ /// <tr><td> Inform method listeners of all method events on a connection. <td> IAMQMethodListener
+ /// <tr><td> Inform method listeners of all error events on a connection. <td> IAMQMethodListener
+ /// </table>
+ ///
+ /// <b>Todo:</b> The broker will close the connection with no warning if authentication fails. This may result in the fail-over process being
+ /// triggered, when it should not be.
+ ///
+ /// </summary>
+ public class AMQProtocolListener : IProtocolListener
+ {
+ /// <summary>Used for debugging.</summary>
+ private static readonly ILog _log = LogManager.GetLogger(typeof(AMQProtocolListener));
+ /// <summary>
+ /// Holds the failover handler for the connection. When a failure is detected, and the current failover state allows it,
+ /// the failover process is handed off to this handler.
+ /// </summary>
+ private FailoverHandler _failoverHandler;
+ /// <summary>Tracks the current fail-over state.</summary>
+ internal FailoverState _failoverState = FailoverState.NOT_STARTED;
+ internal FailoverState FailoverState
+ {
+ get { return _failoverState; }
+ set { _failoverState = value; }
+ }
+ internal ManualResetEvent FailoverLatch;
+ AMQConnection _connection;
+ AMQStateManager _stateManager;
+ public AMQStateManager StateManager
+ {
+ get { return _stateManager; }
+ set { _stateManager = value; }
+ }
+ private readonly ArrayList _frameListeners = ArrayList.Synchronized(new ArrayList());
+ AMQProtocolSession _protocolSession = null;
+ private readonly Object _lock = new Object();
+ public AMQProtocolSession ProtocolSession { set { _protocolSession = value; } }
+ public AMQProtocolListener(AMQConnection connection, AMQStateManager stateManager)
+ {
+ _connection = connection;
+ _stateManager = stateManager;
+ _failoverHandler = new FailoverHandler(connection);
+ }
+ public void OnMessage(IDataBlock message)
+ {
+ // Handle incorrect protocol version.
+ if (message is ProtocolInitiation)
+ {
+ string error = String.Format("Protocol mismatch - {0}", message.ToString());
+ AMQException e = new AMQProtocolHeaderException(error);
+ _log.Error("Closing connection because of protocol mismatch", e);
+ //_protocolSession.CloseProtocolSession();
+ _stateManager.Error(e);
+ return;
+ }
+ AMQFrame frame = (AMQFrame)message;
+ if (frame.BodyFrame is AMQMethodBody)
+ {
+ if (_log.IsDebugEnabled)
+ {
+ _log.Debug("Method frame received: " + frame);
+ }
+ AMQMethodEvent evt = new AMQMethodEvent(frame.Channel, (AMQMethodBody)frame.BodyFrame, _protocolSession);
+ try
+ {
+ bool wasAnyoneInterested = false;
+ lock (_frameListeners.SyncRoot)
+ {
+ foreach (IAMQMethodListener listener in _frameListeners)
+ {
+ wasAnyoneInterested = listener.MethodReceived(evt) || wasAnyoneInterested;
+ }
+ }
+ if (!wasAnyoneInterested)
+ {
+ throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.");
+ }
+ }
+ catch (Exception e)
+ {
+ foreach (IAMQMethodListener listener in _frameListeners)
+ {
+ listener.Error(e);
+ }
+ }
+ }
+ else if (frame.BodyFrame is ContentHeaderBody)
+ {
+ _protocolSession.MessageContentHeaderReceived(frame.Channel,
+ (ContentHeaderBody)frame.BodyFrame);
+ }
+ else if (frame.BodyFrame is ContentBody)
+ {
+ _protocolSession.MessageContentBodyReceived(frame.Channel,
+ (ContentBody)frame.BodyFrame);
+ }
+ else if (frame.BodyFrame is HeartbeatBody)
+ {
+ _log.Debug("HeartBeat received");
+ }
+ }
+ /// <summary>
+ /// Receives notification of any IO exceptions on the connection.
+ ///
+ /// <p/>Upon receipt of a connection closed exception or any IOException, the fail-over process is attempted. If the fail-over fails, then
+ /// all method listeners and the application connection object are notified of the connection failure exception.
+ ///
+ /// <p/>All other exception types are propagated to all method listeners.
+ /// </summary>
+ public void OnException(Exception cause)
+ {
+ _log.Warn("public void OnException(Exception cause = " + cause + "): called");
+ // Ensure that the method listener set cannot be changed whilst this exception is propagated to all listeners. This also
+ // ensures that this exception is fully propagated to all listeners, before another one can be processed.
+ lock (_lock)
+ {
+ if (cause is AMQConnectionClosedException || cause is System.IO.IOException)
+ {
+ // Try a fail-over because the connection has failed.
+ FailoverState failoverState = AttemptFailover();
+ // Check if the fail-over has failed, in which case notify all method listeners of the exception.
+ // The application connection object is also notified of the failure of the connection with the exception.
+ if (failoverState == FailoverState.FAILED)
+ {
+ _log.Debug("Fail-over has failed. Notifying all method listeners of the exception.");
+ AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
+ PropagateExceptionToWaiters(amqe);
+ _connection.ExceptionReceived(cause);
+ }
+ }
+ // Notify all method listeners of the exception.
+ else
+ {
+ PropagateExceptionToWaiters(cause);
+ _connection.ExceptionReceived(cause);
+ }
+ }
+ }
+ /// <summary>
+ /// Tries to fail-over the connection, if the connection policy will permit it, and the fail-over process has not yet been
+ /// started. If the connection does not allow fail-over then an exception will be raised. If a fail-over is already in progress
+ /// this method allows it to continue to run and will do nothing.
+ ///
+ /// <p/>This method should only be called when the connection has been remotely closed.
+ /// </summary>
+ ///
+ /// <returns>The fail-over state at the end of this attempt.</returns>
+ private FailoverState AttemptFailover()
+ {
+ _log.Debug("private void AttemptFailover(): called");
+ _log.Debug("_failoverState = " + _failoverState);
+ // Ensure that the connection stops sending heart beats, if it still is.
+ _connection.StopHeartBeatThread();
+ // Check that the connection policy allows fail-over to be attempted.
+ if (!_connection.IsFailoverAllowed)
+ {
+ _log.Debug("Connection does not allowed to failover");
+ _connection.ExceptionReceived(
+ new AMQDisconnectedException("Broker closed connection and reconnection is not permitted."));
+ }
+ // Check if connection was closed deliberately by the application, in which case no fail-over is attempted.
+ if (_connection.Closed)
+ {
+ return _failoverState;
+ }
+ // If the connection allows fail-over and fail-over has not yet been started, then it is started and the fail-over state is
+ // advanced to 'in progress'
+ if (_failoverState == FailoverState.NOT_STARTED && _connection.IsFailoverAllowed)
+ {
+ _log.Info("Starting the fail-over process.");
+ _failoverState = FailoverState.IN_PROGRESS;
+ StartFailoverThread();
+ }
+ return _failoverState;
+ }
+ /// <summary>
+ /// There are two cases where we have other threads potentially blocking for events to be handled by this
+ /// class. These are for the state manager (waiting for a state change) or a frame listener (waiting for a
+ /// particular type of frame to arrive). When an error occurs we need to notify these waiters so that they can
+ /// react appropriately.
+ ///
+ /// <param name="e">the exception to propagate</param>
+ /// </summary>
+ public void PropagateExceptionToWaiters(Exception e)
+ {
+ // FIXME: not sure if required as StateManager is in _frameListeners. Probably something to do with fail-over.
+ _stateManager.Error(e);
+ lock ( _lock )
+ {
+ foreach ( IAMQMethodListener listener in _frameListeners )
+ {
+ listener.Error(e);
+ }
+ }
+ }
+ public void AddFrameListener(IAMQMethodListener listener)
+ {
+ lock ( _lock )
+ {
+ _frameListeners.Add(listener);
+ }
+ }
+ public void RemoveFrameListener(IAMQMethodListener listener)
+ {
+ if (_log.IsDebugEnabled)
+ {
+ _log.Debug("Removing frame listener: " + listener.ToString());
+ }
+ lock ( _lock )
+ {
+ _frameListeners.Remove(listener);
+ }
+ }
+ public void BlockUntilNotFailingOver()
+ {
+ if (FailoverLatch != null)
+ {
+ FailoverLatch.WaitOne();
+ }
+ }
+ /// <summary>
+ /// "Failover" for redirection.
+ /// </summary>
+ /// <param name="host"></param>
+ /// <param name="port"></param>
+ public void Failover(string host, int port)
+ {
+ _failoverHandler.setHost(host);
+ _failoverHandler.setPort(port);
+ // see javadoc for FailoverHandler to see rationale for separate thread
+ StartFailoverThread();
+ }
+ private void StartFailoverThread()
+ {
+ Thread failoverThread = new Thread(new ThreadStart(_failoverHandler.Run));
+ failoverThread.Name = "Failover";
+ // Do not inherit daemon-ness from current thread as this can be a daemon
+ // thread such as a AnonymousIoService thread.
+ failoverThread.IsBackground = false;
+ failoverThread.Start();
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs
new file mode 100644
index 0000000000..1fb3d407eb
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs
@@ -0,0 +1,267 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using log4net;
+using Apache.Qpid.Client.Message;
+using Apache.Qpid.Client.Transport;
+using Apache.Qpid.Framing;
+using Apache.Qpid.Sasl;
+namespace Apache.Qpid.Client.Protocol
+ public class AMQProtocolSession
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(AMQProtocolSession));
+ private readonly IProtocolWriter _protocolWriter;
+ private readonly IConnectionCloser _connectionCloser;
+ /// <summary>
+ /// Maps from the channel id to the AmqChannel that it represents.
+ /// </summary>
+ //private ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap();
+ private Hashtable _channelId2SessionMap = Hashtable.Synchronized(new Hashtable());
+ //private ConcurrentMap _closingChannels = new ConcurrentHashMap();
+ private Hashtable _closingChannels = Hashtable.Synchronized(new Hashtable());
+ /// <summary>
+ /// Maps from a channel id to an unprocessed message. This is used to tie together the
+ /// JmsDeliverBody (which arrives first) with the subsequent content header and content bodies.
+ /// </summary>
+ //private ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap();
+ private Hashtable _channelId2UnprocessedMsgMap = Hashtable.Synchronized(new Hashtable());
+ private AMQConnection _connection;
+ public string ClientID { get { return _connection.ClientID; } }
+ public AMQProtocolSession(IProtocolWriter protocolWriter, IConnectionCloser connectionCloser, AMQConnection connection)
+ {
+ _protocolWriter = protocolWriter;
+ _connectionCloser = connectionCloser;
+ _connection = connection;
+ }
+ public void Init()
+ {
+ // start the process of setting up the connection. This is the first place that
+ // data is written to the server.
+ _protocolWriter.Write(new ProtocolInitiation());
+ }
+ public string Username
+ {
+ get
+ {
+ return AMQConnection.Username;
+ }
+ }
+ public string Password
+ {
+ get
+ {
+ return AMQConnection.Password;
+ }
+ }
+ ConnectionTuneParameters _connectionTuneParameters; // TODO: should be able to have this in the Java too.
+ public ConnectionTuneParameters ConnectionTuneParameters
+ {
+ get
+ {
+ return _connectionTuneParameters;
+ }
+ set
+ {
+ _connectionTuneParameters = value;
+ AMQConnection con = AMQConnection;
+ con.SetMaximumChannelCount(value.ChannelMax);
+ con.MaximumFrameSize = value.FrameMax;
+ }
+ }
+ private ISaslClient _saslClient;
+ public ISaslClient SaslClient
+ {
+ get { return _saslClient; }
+ set { _saslClient = value; }
+ }
+ /// <summary>
+ /// Callback invoked from the BasicDeliverMethodHandler when a message has been received.
+ /// This is invoked on the MINA dispatcher thread.
+ /// </summary>
+ /// <param name="message">the unprocessed message</param>
+ /// <exception cname="AMQException">if this was not expected</exception>
+ public void UnprocessedMessageReceived(UnprocessedMessage message)
+ {
+ _channelId2UnprocessedMsgMap[message.ChannelId] = message;
+ }
+ public void MessageContentHeaderReceived(ushort channelId, ContentHeaderBody contentHeader)
+ {
+ UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap[channelId];
+ if (msg == null)
+ {
+ throw new AMQException("Error: received content header without having received a JMSDeliver frame first");
+ }
+ if (msg.ContentHeader != null)
+ {
+ throw new AMQException("Error: received duplicate content header or did not receive correct number of content body frames");
+ }
+ msg.ContentHeader = contentHeader;
+ if (contentHeader.BodySize == 0)
+ {
+ DeliverMessageToAMQSession(channelId, msg);
+ }
+ }
+ public void MessageContentBodyReceived(ushort channelId, ContentBody contentBody)
+ {
+ UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap[channelId];
+ if (msg == null)
+ {
+ throw new AMQException("Error: received content body without having received a BasicDeliver frame first");
+ }
+ if (msg.ContentHeader == null)
+ {
+ _channelId2UnprocessedMsgMap.Remove(channelId);
+ throw new AMQException("Error: received content body without having received a ContentHeader frame first");
+ }
+ try
+ {
+ msg.ReceiveBody(contentBody);
+ }
+ catch (UnexpectedBodyReceivedException e)
+ {
+ _channelId2UnprocessedMsgMap.Remove(channelId);
+ throw e;
+ }
+ if (msg.IsAllBodyDataReceived())
+ {
+ DeliverMessageToAMQSession(channelId, msg);
+ }
+ }
+ /// <summary>
+ /// Deliver a message to the appropriate session, removing the unprocessed message
+ /// from our map
+ /// <param name="channelId">the channel id the message should be delivered to</param>
+ /// <param name="msg"> the message</param>
+ private void DeliverMessageToAMQSession(ushort channelId, UnprocessedMessage msg)
+ {
+ AmqChannel channel = (AmqChannel) _channelId2SessionMap[channelId];
+ channel.MessageReceived(msg);
+ _channelId2UnprocessedMsgMap.Remove(channelId);
+ }
+ /// <summary>
+ /// Convenience method that writes a frame to the protocol session. Equivalent
+ /// to calling getProtocolSession().write().
+ /// </summary>
+ /// <param name="frame">the frame to write</param>
+ public void WriteFrame(IDataBlock frame)
+ {
+ _protocolWriter.Write(frame);
+ }
+ public void AddSessionByChannel(ushort channelId, AmqChannel channel)
+ {
+ if (channel == null)
+ {
+ throw new ArgumentNullException("Attempt to register a null channel");
+ }
+ _logger.Debug("Add channel with channel id " + channelId);
+ _channelId2SessionMap[channelId] = channel;
+ }
+ public void RemoveSessionByChannel(ushort channelId)
+ {
+ _logger.Debug("Removing session with channelId " + channelId);
+ _channelId2SessionMap.Remove(channelId);
+ }
+ /// <summary>
+ /// Starts the process of closing a channel
+ /// </summary>
+ /// <param name="channel" the AmqChannel being closed</param>
+ public void CloseSession(AmqChannel channel)
+ {
+ _logger.Debug("closeSession called on protocol channel for channel " + channel.ChannelId);
+ ushort channelId = channel.ChannelId;
+ // we need to know when a channel is closing so that we can respond
+ // with a channel.close frame when we receive any other type of frame
+ // on that channel
+ _closingChannels[channelId] = channel;
+ }
+ /// <summary>
+ /// Called from the ChannelClose handler when a channel close frame is received.
+ /// This method decides whether this is a response or an initiation. The latter
+ /// case causes the AmqChannel to be closed and an exception to be thrown if
+ /// appropriate.
+ /// </summary>
+ /// <param name="channelId">the id of the channel (session)</param>
+ /// <returns>true if the client must respond to the server, i.e. if the server
+ /// initiated the channel close, false if the channel close is just the server
+ /// responding to the client's earlier request to close the channel.</returns>
+ public bool ChannelClosed(ushort channelId, int code, string text)
+ {
+ // if this is not a response to an earlier request to close the channel
+ if (!_closingChannels.ContainsKey(channelId))
+ {
+ _closingChannels.Remove(channelId);
+ AmqChannel channel = (AmqChannel) _channelId2SessionMap[channelId];
+ channel.ClosedWithException(new AMQException(_logger, code, text));
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ public AMQConnection AMQConnection
+ {
+ get
+ {
+ return _connection;
+ }
+ }
+ public void CloseProtocolSession()
+ {
+ _logger.Debug("Closing protocol session");
+ _connectionCloser.Close();
+ }
+ internal string GenerateQueueName()
+ {
+ return "ntmp_" + System.Guid.NewGuid();
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/DefaultTimeouts.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/DefaultTimeouts.cs
new file mode 100644
index 0000000000..2f23a1571d
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Protocol/DefaultTimeouts.cs
@@ -0,0 +1,47 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Text;
+namespace Apache.Qpid.Client.Protocol
+ /// <summary>
+ /// Default timeout values for the protocol
+ /// </summary>
+ sealed class DefaultTimeouts
+ {
+ /// <summary>
+ /// Maximum number of milliseconds to wait for a state change
+ /// in the protocol's state machine
+ /// </summary>
+ public const int MaxWaitForState = 30* 1000;
+ /// <summary>
+ /// Maximum number of milliseconds to wait for a reply
+ /// frame when doing synchronous writer to the broker
+ /// </summary>
+ public const int MaxWaitForSyncWriter = 30 * 1000;
+ private DefaultTimeouts()
+ {
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/IConnectionCloser.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/IConnectionCloser.cs
new file mode 100644
index 0000000000..e3298200c4
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Protocol/IConnectionCloser.cs
@@ -0,0 +1,27 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace Apache.Qpid.Client.Protocol
+ public interface IConnectionCloser
+ {
+ void Close();
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/IProtocolListener.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/IProtocolListener.cs
new file mode 100644
index 0000000000..3b53f015f8
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Protocol/IProtocolListener.cs
@@ -0,0 +1,36 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using Apache.Qpid.Client.Protocol.Listener;
+using Apache.Qpid.Framing;
+namespace Apache.Qpid.Client.Protocol
+ public interface IProtocolListener
+ {
+ void OnMessage(IDataBlock message);
+ void OnException(Exception e);
+ // XXX: .NET way of doing listeners?
+ void AddFrameListener(IAMQMethodListener listener);
+ void RemoveFrameListener(IAMQMethodListener listener);
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/BlockingMethodFrameListener.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/BlockingMethodFrameListener.cs
new file mode 100644
index 0000000000..9cc9f8cee5
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/BlockingMethodFrameListener.cs
@@ -0,0 +1,110 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+using Apache.Qpid.Framing;
+namespace Apache.Qpid.Client.Protocol.Listener
+ public abstract class BlockingMethodFrameListener : IAMQMethodListener
+ {
+ private ManualResetEvent _resetEvent;
+ public abstract bool ProcessMethod(ushort channelId, AMQMethodBody frame);
+ /// <summary>
+ /// This is set if there is an exception thrown from processCommandFrame and the
+ /// exception is rethrown to the caller of blockForFrame()
+ /// </summary>
+ private volatile Exception _error;
+ protected ushort _channelId;
+ protected AMQMethodEvent _doneEvt = null;
+ public BlockingMethodFrameListener(ushort channelId)
+ {
+ _channelId = channelId;
+ _resetEvent = new ManualResetEvent(false);
+ }
+ /// <summary>
+ /// This method is called by the MINA dispatching thread. Note that it could
+ /// be called before BlockForFrame() has been called.
+ /// </summary>
+ /// <param name="evt">the frame event</param>
+ /// <returns>true if the listener has dealt with this frame</returns>
+ /// <exception cref="AMQException"></exception>
+ public bool MethodReceived(AMQMethodEvent evt)
+ {
+ AMQMethodBody method = evt.Method;
+ try
+ {
+ bool ready = (evt.ChannelId == _channelId) && ProcessMethod(evt.ChannelId, method);
+ if (ready)
+ {
+ _doneEvt = evt;
+ _resetEvent.Set();
+ }
+ return ready;
+ }
+ catch (AMQException e)
+ {
+ Error(e);
+ // we rethrow the error here, and the code in the frame dispatcher will go round
+ // each listener informing them that an exception has been thrown
+ throw e;
+ }
+ }
+ /// <summary>
+ /// This method is called by the thread that wants to wait for a frame.
+ /// </summary>
+ /// <param name="timeout">Set the number of milliseconds to wait</param>
+ public AMQMethodEvent BlockForFrame(int timeout)
+ {
+ _resetEvent.WaitOne(timeout, true);
+ //at this point the event will have been signalled. The error field might or might not be set
+ // depending on whether an error occurred
+ if (_error != null)
+ {
+ throw _error;
+ }
+ return _doneEvt;
+ }
+ /// <summary>
+ /// This is a callback, called by the MINA dispatcher thread only. It is also called from within this
+ /// class to avoid code repetition but again is only called by the MINA dispatcher thread.
+ /// </summary>
+ /// <param name="e">the exception that caused the error</param>
+ public void Error(Exception e)
+ {
+ // set the error so that the thread that is blocking (in BlockForFrame())
+ // can pick up the exception and rethrow to the caller
+ _error = e;
+ _resetEvent.Set();
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/IAMQMethodListener.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/IAMQMethodListener.cs
new file mode 100644
index 0000000000..b5450d00f7
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/IAMQMethodListener.cs
@@ -0,0 +1,46 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+namespace Apache.Qpid.Client.Protocol.Listener
+ public interface IAMQMethodListener
+ {
+ /// <summary>
+ /// Invoked when a method frame has been received
+ /// <param name="evt">the event</param>
+ /// <returns>true if the handler has processed the method frame, false otherwise. Note
+ /// that this does not prohibit the method event being delivered to subsequent listeners
+ /// but can be used to determine if nobody has dealt with an incoming method frame.</param>
+ /// <exception cname="AMQException">if an error has occurred. This exception will be delivered
+ /// to all registered listeners using the error() method (see below) allowing them to
+ /// perform cleanup if necessary.</exception>
+ bool MethodReceived(AMQMethodEvent evt);
+ /// <summary>
+ /// Callback when an error has occurred. Allows listeners to clean up.
+ /// </summary>
+ /// <param name="e">the exception</param>
+ void Error(Exception e);
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/SpecificMethodFrameListener.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/SpecificMethodFrameListener.cs
new file mode 100644
index 0000000000..8cdc1dbba9
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Protocol/Listener/SpecificMethodFrameListener.cs
@@ -0,0 +1,42 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using Apache.Qpid.Framing;
+namespace Apache.Qpid.Client.Protocol.Listener
+ public class SpecificMethodFrameListener : BlockingMethodFrameListener
+ {
+ private readonly Type _expectedClass;
+ public SpecificMethodFrameListener(ushort channelId, Type expectedClass) : base(channelId)
+ {
+ _expectedClass = expectedClass;
+ }
+ public override bool ProcessMethod(ushort channelId, AMQMethodBody frame)
+ {
+ return _expectedClass.IsInstanceOfType(frame);
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Protocol/ProtocolWriter.cs b/qpid/dotnet/Qpid.Client/Client/Protocol/ProtocolWriter.cs
new file mode 100644
index 0000000000..11918f1ea2
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Protocol/ProtocolWriter.cs
@@ -0,0 +1,107 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using Apache.Qpid.Client.Protocol.Listener;
+using Apache.Qpid.Client.Transport;
+using Apache.Qpid.Framing;
+using log4net;
+namespace Apache.Qpid.Client.Protocol
+ /// <summary>
+ /// A convenient interface to writing protocol frames.
+ /// </summary>
+ public class ProtocolWriter
+ {
+ private ILog _logger = LogManager.GetLogger(typeof(ProtocolWriter));
+ IProtocolWriter _protocolWriter;
+ IProtocolListener _protocolListener;
+ public ProtocolWriter(IProtocolWriter protocolWriter, IProtocolListener protocolListener)
+ {
+ _protocolWriter = protocolWriter;
+ _protocolListener = protocolListener;
+ }
+ public void WriteFrame(IDataBlock frame)
+ {
+ _protocolWriter.Write(frame);
+ }
+ /// <summary>
+ /// Convenience method that writes a frame to the protocol session and waits for
+ /// a particular response. Equivalent to calling getProtocolSession().write() then
+ /// waiting for the response.
+ /// </summary>
+ /// <param name="frame">the frame</param>
+ /// <param name="listener">the blocking listener. Note the calling thread will block.</param>
+ /// <param name="timeout">set the number of milliseconds to wait</param>
+ private AMQMethodEvent SyncWrite(AMQFrame frame, BlockingMethodFrameListener listener, int timeout)
+ {
+ try
+ {
+ _protocolListener.AddFrameListener(listener);
+ _protocolWriter.Write(frame);
+ return listener.BlockForFrame(timeout);
+ }
+ finally
+ {
+ _protocolListener.RemoveFrameListener(listener);
+ }
+ // When control resumes before this line, a reply will have been received
+ // that matches the criteria defined in the blocking listener
+ }
+ /// <summary>
+ /// Convenience method that writes a frame to the protocol session and waits for
+ /// a particular response. Equivalent to calling getProtocolSession().write() then
+ /// waiting for the response.
+ /// </summary>
+ /// <param name="frame">the frame</param>
+ /// <param name="responseType">the type of method response</param>
+ public AMQMethodEvent SyncWrite(AMQFrame frame, Type responseType)
+ {
+ // TODO: If each frame knew it's response type, then the responseType argument would
+ // TODO: not be neccesary.
+ return SyncWrite(frame, responseType, DefaultTimeouts.MaxWaitForSyncWriter);
+ }
+ /// <summary>
+ /// Convenience method that writes a frame to the protocol session and waits for
+ /// a particular response. Equivalent to calling getProtocolSession().write() then
+ /// waiting for the response.
+ /// </summary>
+ /// <param name="frame">the frame</param>
+ /// <param name="responseType">the type of method response</param>
+ /// <param name="timeout">set the number of milliseconds to wait</param>
+ /// <returns>set the number of milliseconds to wait</returns>
+ public AMQMethodEvent SyncWrite(AMQFrame frame, Type responseType, int timeout)
+ {
+ return SyncWrite(frame, new SpecificMethodFrameListener(frame.Channel, responseType), timeout);
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/QpidConnectionInfo.cs b/qpid/dotnet/Qpid.Client/Client/QpidConnectionInfo.cs
new file mode 100644
index 0000000000..ede8966f37
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/QpidConnectionInfo.cs
@@ -0,0 +1,504 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Net;
+using System.Text;
+using System.Text.RegularExpressions;
+using log4net;
+using Apache.Qpid.Client.Qms;
+namespace Apache.Qpid.Client
+ public class URLHelper
+ {
+ public static char DEFAULT_OPTION_SEPERATOR = '&';
+ public static char ALTERNATIVE_OPTION_SEPARATOR = ',';
+ public static char BROKER_SEPARATOR = ';';
+ /// <summary>
+ ///
+ /// </summary>
+ /// <param name="optionMap"></param>
+ /// <param name="options"></param>
+ public static void parseOptions(IDictionary optionMap, string options)
+ {
+ //options looks like this
+ //brokerlist='tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'',failover='method?option='value',option='value''
+ if (options == null || options.IndexOf('=') == -1)
+ {
+ return;
+ }
+ int optionIndex = options.IndexOf('=');
+ String option = options.Substring(0, optionIndex);
+ int length = options.Length;
+ int nestedQuotes = 0;
+ // to store index of final "'"
+ int valueIndex = optionIndex;
+ //Walk remainder of url.
+ while (nestedQuotes > 0 || valueIndex < length)
+ {
+ valueIndex++;
+ if (valueIndex >= length)
+ {
+ break;
+ }
+ if (options[valueIndex] == '\'')
+ {
+ if (valueIndex + 1 < options.Length)
+ {
+ if (options[valueIndex + 1] == DEFAULT_OPTION_SEPERATOR ||
+ options[valueIndex + 1] == ALTERNATIVE_OPTION_SEPARATOR ||
+ options[valueIndex + 1] == BROKER_SEPARATOR ||
+ options[valueIndex + 1] == '\'')
+ {
+ nestedQuotes--;
+ // System.out.println(
+ // options + "\n" + "-" + nestedQuotes + ":" + getPositionString(valueIndex - 2, 1));
+ if (nestedQuotes == 0)
+ {
+ //We've found the value of an option
+ break;
+ }
+ }
+ else
+ {
+ nestedQuotes++;
+ // System.out.println(
+ // options + "\n" + "+" + nestedQuotes + ":" + getPositionString(valueIndex - 2, 1));
+ }
+ }
+ else
+ {
+ // We are at the end of the string
+ // Check to see if we are corectly closing quotes
+ if (options[valueIndex] == '\'')
+ {
+ nestedQuotes--;
+ }
+ break;
+ }
+ }
+ }
+ if (nestedQuotes != 0 || valueIndex < (optionIndex + 2))
+ {
+ int sepIndex = 0;
+ //Try and identify illegal separator character
+ if (nestedQuotes > 1)
+ {
+ for (int i = 0; i < nestedQuotes; i++)
+ {
+ sepIndex = options.IndexOf('\'', sepIndex);
+ sepIndex++;
+ }
+ }
+ if (sepIndex >= options.Length || sepIndex == 0)
+ {
+ parseError(valueIndex, "Unterminated option", options);
+ }
+ else
+ {
+ parseError(sepIndex, "Unterminated option. Possible illegal option separator:'" +
+ options[sepIndex] + "'", options);
+ }
+ }
+ // optionIndex +2 to skip "='"
+ int sublen = valueIndex - (optionIndex + 2);
+ String value = options.Substring(optionIndex + 2, sublen);
+ optionMap.Add(option, value);
+ if (valueIndex < (options.Length - 1))
+ {
+ //Recurse to get remaining options
+ parseOptions(optionMap, options.Substring(valueIndex + 2));
+ }
+ }
+ public static void parseError(int index, String error, String url)
+ {
+ parseError(index, 1, error, url);
+ }
+ public static void parseError(int index, int length, String error, String url)
+ {
+ throw new UrlSyntaxException(url, error, index, length);
+ }
+ public static String printOptions(Hashtable options)
+ {
+ if (options.Count == 0)
+ {
+ return "";
+ }
+ else
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.Append('?');
+ foreach (String key in options.Keys)
+ {
+ sb.AppendFormat("{0}='{1}'{2}", key, options[key], DEFAULT_OPTION_SEPERATOR);
+ }
+ sb.Remove(sb.Length - 1, 1);
+ return sb.ToString();
+ }
+ }
+ }
+ public class QpidConnectionUrl
+ {
+ internal static IConnectionInfo FromUrl(string fullURL)
+ {
+ //_url = fullURL;
+ IConnectionInfo connectionInfo = new QpidConnectionInfo();
+ // _options = new HashMap<String, String>();
+ // _brokers = new LinkedList();
+ // _failoverOptions = new HashMap<String, String>();
+ // Connection URL format
+ //amqp://[user:pass@][clientid]/virtualhost?brokerlist='tcp://host:port?option=\'value\',option=\'value\';vm://:3/virtualpath?option=\'value\'',failover='method?option=\'value\',option='value''"
+ // Options are of course optional except for requiring a single broker in the broker list.
+ try
+ {
+ Uri connection = new Uri(fullURL);
+ if (connection.Scheme == null || !(connection.Scheme.Equals(ConnectionUrlConstants.AMQ_PROTOCOL)))
+ {
+ throw new UrlSyntaxException(fullURL, "Not an AMQP URL");
+ }
+ if (connection.Host != null && connection.Host.Length > 0 && !connection.Host.Equals("default"))
+ {
+ connectionInfo.ClientName = connection.Host;
+ }
+ String userInfo = connection.UserInfo;
+ if (userInfo == null || userInfo.Length == 0)
+ {
+ URLHelper.parseError(ConnectionUrlConstants.AMQ_PROTOCOL.Length + 3,
+ "User information not found on url", fullURL);
+ }
+ else
+ {
+ parseUserInfo(userInfo, fullURL, connectionInfo);
+ }
+ String virtualHost = connection.AbsolutePath; // XXX: is AbsolutePath corrrect?
+ if (virtualHost != null && virtualHost.Length > 0)
+ {
+ connectionInfo.VirtualHost = virtualHost;
+ }
+ else
+ {
+ int authLength = connection.Authority.Length;
+ int start = ConnectionUrlConstants.AMQ_PROTOCOL.Length + 3;
+ int testIndex = start + authLength;
+ if (testIndex < fullURL.Length && fullURL[testIndex] == '?')
+ {
+ URLHelper.parseError(start, testIndex - start, "Virtual host found", fullURL);
+ }
+ else
+ {
+ URLHelper.parseError(-1, "Virtual host not specified", fullURL);
+ }
+ }
+ QpidConnectionInfo qci = (QpidConnectionInfo)connectionInfo;
+ string query = connection.Query;
+ if (query[0] == '?') query = query.Substring(1);
+ URLHelper.parseOptions(qci.GetOptions(), query);
+ processOptions(connectionInfo);
+ //Fragment is #string (not used)
+ //System.out.println(connection.getFragment());
+ return connectionInfo;
+ }
+ catch (UriFormatException uris)
+ {
+ throw uris;
+ // if (uris is UrlSyntaxException)
+ // {
+ // throw uris;
+ // }
+ //
+ // int slash = fullURL.IndexOf("\\");
+ //
+ // if (slash == -1)
+ // {
+ // URLHelper.parseError(uris.GetIndex(), uris.getReason(), uris.getInput());
+ // }
+ // else
+ // {
+ // if (slash != 0 && fullURL.charAt(slash - 1) == ':')
+ // {
+ // URLHelper.parseError(slash - 2, fullURL.indexOf('?') - slash + 2, "Virtual host looks like a windows path, forward slash not allowed in URL", fullURL);
+ // }
+ // else
+ // {
+ // URLHelper.parseError(slash, "Forward slash not allowed in URL", fullURL);
+ // }
+ // }
+ }
+ }
+ private static void parseUserInfo(String userinfo, string fullUrl, IConnectionInfo connectionInfo)
+ {
+ //user info = user:pass
+ int colonIndex = userinfo.IndexOf(':');
+ if (colonIndex == -1)
+ {
+ URLHelper.parseError(ConnectionUrlConstants.AMQ_PROTOCOL.Length + 3,
+ userinfo.Length, "Null password in user information not allowed.", fullUrl);
+ }
+ else
+ {
+ connectionInfo.Username = userinfo.Substring(0, colonIndex);
+ connectionInfo.Password = userinfo.Substring(colonIndex + 1);
+ }
+ }
+ private static void processOptions(IConnectionInfo connectionInfo)
+ {
+ string brokerlist = connectionInfo.GetOption(ConnectionUrlConstants.OPTIONS_BROKERLIST);
+ if (brokerlist != null)
+ {
+ //brokerlist tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'
+ Regex splitter = new Regex("" + URLHelper.BROKER_SEPARATOR);
+ foreach (string broker in splitter.Split(brokerlist))
+ {
+ connectionInfo.AddBrokerInfo(new AmqBrokerInfo(broker));
+ }
+ connectionInfo.SetOption(ConnectionUrlConstants.OPTIONS_BROKERLIST, null);
+ // _options.remove(OPTIONS_BROKERLIST);
+ }
+ string failover = connectionInfo.GetOption(ConnectionUrlConstants.OPTIONS_FAILOVER);
+ if (failover != null)
+ {
+ // failover='method?option='value',option='value''
+ int methodIndex = failover.IndexOf('?');
+ if (methodIndex > -1)
+ {
+ connectionInfo.FailoverMethod = failover.Substring(0, methodIndex);
+ QpidConnectionInfo qpidConnectionInfo = (QpidConnectionInfo)connectionInfo;
+ URLHelper.parseOptions(qpidConnectionInfo.GetFailoverOptions(),
+ failover.Substring(methodIndex + 1));
+ }
+ else
+ {
+ connectionInfo.FailoverMethod = failover;
+ }
+ connectionInfo.SetOption(ConnectionUrlConstants.OPTIONS_FAILOVER, null);
+ // _options.remove(OPTIONS_FAILOVER);
+ }
+ }
+ internal static IConnectionInfo FromUri(Uri uri)
+ {
+ return null; // FIXME
+ }
+ }
+ public class QpidConnectionInfo : IConnectionInfo
+ {
+ const string DEFAULT_VHOST = "/";
+ string _username = "guest";
+ string _password = "guest";
+ string _virtualHost = DEFAULT_VHOST;
+ string _failoverMethod = null;
+ IDictionary _failoverOptions = new Hashtable();
+ IDictionary _options = new Hashtable();
+ IList _brokerInfos = new ArrayList(); // List<BrokerInfo>
+ string _clientName = String.Format("{0}{1:G}", Dns.GetHostName(), DateTime.Now.Ticks);
+ public IDictionary GetFailoverOptions()
+ {
+ return _failoverOptions;
+ }
+ public IDictionary GetOptions()
+ {
+ return _options;
+ }
+ public static IConnectionInfo FromUrl(String url)
+ {
+ return QpidConnectionUrl.FromUrl(url);
+ }
+ public string AsUrl()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.AppendFormat("{0}://", ConnectionUrlConstants.AMQ_PROTOCOL);
+ if (_username != null)
+ {
+ sb.Append(_username);
+ if (_password != null)
+ {
+ sb.AppendFormat(":{0}", _password);
+ }
+ sb.Append("@");
+ }
+ sb.Append(_clientName);
+ sb.Append(_virtualHost);
+ sb.Append(OptionsToString());
+ return sb.ToString();
+ }
+ private String OptionsToString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.AppendFormat("?{0}='", ConnectionUrlConstants.OPTIONS_BROKERLIST);
+ foreach (IBrokerInfo broker in _brokerInfos)
+ {
+ sb.AppendFormat("{0};", broker);
+ }
+ sb.Remove(sb.Length - 1, 1);
+ sb.Append("'");
+ if (_failoverMethod != null)
+ {
+ sb.AppendFormat("{0}{1}='{2}{3}'", URLHelper.DEFAULT_OPTION_SEPERATOR,
+ ConnectionUrlConstants.OPTIONS_FAILOVER,
+ _failoverMethod,
+ URLHelper.printOptions((Hashtable)_failoverOptions));
+ }
+ return sb.ToString();
+ }
+ public string FailoverMethod
+ {
+ get { return _failoverMethod; }
+ set { _failoverMethod = value; }
+ }
+ public string GetFailoverOption(string key)
+ {
+ return (string)_failoverOptions[key];
+ }
+ public int BrokerCount
+ {
+ get { return _brokerInfos.Count; }
+ }
+ public IBrokerInfo GetBrokerInfo(int index)
+ {
+ return (IBrokerInfo)_brokerInfos[index];
+ }
+ public void AddBrokerInfo(IBrokerInfo brokerInfo)
+ {
+ if (!_brokerInfos.Contains(brokerInfo))
+ {
+ _brokerInfos.Add(brokerInfo);
+ }
+ }
+ public IList GetAllBrokerInfos()
+ {
+ return _brokerInfos;
+ }
+ public string ClientName
+ {
+ get { return _clientName; }
+ set { _clientName = value; }
+ }
+ public string Username
+ {
+ get { return _username; }
+ set { _username = value; }
+ }
+ public string Password
+ {
+ get { return _password; }
+ set { _password = value; }
+ }
+ public string VirtualHost
+ {
+ get { return _virtualHost; }
+ set {
+ _virtualHost = value;
+ if ( _virtualHost == null || _virtualHost.Length == 0 )
+ _virtualHost = DEFAULT_VHOST;
+ if ( _virtualHost[0] != '/' )
+ _virtualHost = '/' + _virtualHost;
+ }
+ }
+ public string GetOption(string key)
+ {
+ return (string)_options[key];
+ }
+ public void SetOption(string key, string value)
+ {
+ _options[key] = value;
+ }
+ public override string ToString()
+ {
+ return AsUrl();
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Security/CallbackHandlerRegistry.cs b/qpid/dotnet/Qpid.Client/Client/Security/CallbackHandlerRegistry.cs
new file mode 100644
index 0000000000..9ac0381850
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Security/CallbackHandlerRegistry.cs
@@ -0,0 +1,129 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Configuration;
+using System.Text;
+using Apache.Qpid.Sasl;
+using Apache.Qpid.Sasl.Mechanisms;
+using Apache.Qpid.Client.Configuration;
+namespace Apache.Qpid.Client.Security
+ /// <summary>
+ /// Helper class to map SASL mechanisms to our
+ /// internal ISaslCallbackHandler implementations.
+ /// </summary>
+ /// <remarks>
+ /// The set of configured callback handlers and their order
+ /// controls the selection of the SASL mechanism used for authentication.
+ /// <para>
+ /// You can either replace the default handler for CRAM-MD5 and PLAIN
+ /// authentication (the two default options) using the application
+ /// configuration file. Configuration is done by especifying the SASL
+ /// mechanism name (e.g PLAIN) and the type implementing the callback handler
+ /// used to provide any data required by the mechanism like username and password.
+ /// </para>
+ /// <para>
+ /// Callback handler types should implement the IAMQCallbackHandler interface.
+ /// </para>
+ /// <para>
+ /// New callbacks or authentication mechanisms can be configured like this:
+ /// </para>
+ /// <example><![CDATA[
+ /// <configuration>
+ /// <configSections>
+ /// <sectionGroup name="qpid.client">
+ /// <section name="authentication" type="Apache.Qpid.Client.Configuration.AuthenticationConfigurationSectionHandler, Apache.Qpid.Client"/>
+ /// </sectionGroup>
+ /// </configSections>
+ /// <qpid.client>
+ /// <authentication>
+ /// <add key="TEST" value="Apache.Qpid.Client.Tests.Security.TestCallbackHandler, Apache.Qpid.Client.Tests"/>
+ /// </authentication>
+ /// </qpid.client>
+ /// </configuration>
+ /// ]]></example>
+ /// </remarks>
+ public sealed class CallbackHandlerRegistry
+ {
+ private static CallbackHandlerRegistry _instance =
+ new CallbackHandlerRegistry();
+ private OrderedHashTable _mechanism2HandlerMap;
+ private string[] _mechanisms;
+ public static CallbackHandlerRegistry Instance
+ {
+ get { return _instance; }
+ }
+ public string[] Mechanisms
+ {
+ get { return _mechanisms; }
+ }
+ private CallbackHandlerRegistry()
+ {
+ _mechanism2HandlerMap = (OrderedHashTable)
+ ConfigurationSettings.GetConfig("qpid.client/authentication");
+ // configure default options if not available
+ if ( _mechanism2HandlerMap == null )
+ _mechanism2HandlerMap = new OrderedHashTable();
+ if ( !_mechanism2HandlerMap.Contains(ExternalSaslClient.Mechanism) )
+ _mechanism2HandlerMap.Add(ExternalSaslClient.Mechanism, typeof(UsernamePasswordCallbackHandler));
+ if ( !_mechanism2HandlerMap.Contains(CramMD5SaslClient.Mechanism) )
+ _mechanism2HandlerMap.Add(CramMD5SaslClient.Mechanism, typeof(UsernamePasswordCallbackHandler));
+ if ( !_mechanism2HandlerMap.Contains(CramMD5HexSaslClient.Mechanism) )
+ _mechanism2HandlerMap.Add(CramMD5HexSaslClient.Mechanism, typeof(UsernamePasswordCallbackHandler));
+ if ( !_mechanism2HandlerMap.Contains(PlainSaslClient.Mechanism) )
+ _mechanism2HandlerMap.Add(PlainSaslClient.Mechanism, typeof(UsernamePasswordCallbackHandler));
+ _mechanisms = new string[_mechanism2HandlerMap.Count];
+ _mechanism2HandlerMap.OrderedKeys.CopyTo(_mechanisms, 0);
+ }
+ public bool IsSupportedMechanism(string mechanism)
+ {
+ return _mechanism2HandlerMap.Contains(mechanism);
+ }
+ public string ChooseMechanism(string mechanisms)
+ {
+ IList mechs = mechanisms.Split(' ');
+ foreach ( string supportedMech in _mechanisms )
+ {
+ if ( mechs.Contains(supportedMech) )
+ return supportedMech;
+ }
+ return null;
+ }
+ public Type GetCallbackHandler(string mechanism)
+ {
+ return (Type)_mechanism2HandlerMap[mechanism];
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Security/IAMQCallbackHandler.cs b/qpid/dotnet/Qpid.Client/Client/Security/IAMQCallbackHandler.cs
new file mode 100644
index 0000000000..6ff45be04a
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Security/IAMQCallbackHandler.cs
@@ -0,0 +1,35 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Text;
+using Apache.Qpid.Client.Protocol;
+using Apache.Qpid.Sasl;
+namespace Apache.Qpid.Client.Security
+ public interface IAMQCallbackHandler : ISaslCallbackHandler
+ {
+ void Initialize(AMQProtocolSession session);
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Security/UsernamePasswordCallbackHandler.cs b/qpid/dotnet/Qpid.Client/Client/Security/UsernamePasswordCallbackHandler.cs
new file mode 100644
index 0000000000..743ade77c9
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Security/UsernamePasswordCallbackHandler.cs
@@ -0,0 +1,56 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Text;
+using Apache.Qpid.Client.Protocol;
+using Apache.Qpid.Sasl;
+namespace Apache.Qpid.Client.Security
+ internal class UsernamePasswordCallbackHandler : IAMQCallbackHandler
+ {
+ private AMQProtocolSession _session;
+ public void Initialize(AMQProtocolSession session)
+ {
+ if ( session == null )
+ throw new ArgumentNullException("session");
+ _session = session;
+ }
+ public void Handle(ISaslCallback[] callbacks)
+ {
+ foreach ( ISaslCallback cb in callbacks )
+ {
+ if ( cb is NameCallback )
+ {
+ ((NameCallback)cb).Text = _session.Username;
+ } else if ( cb is PasswordCallback )
+ {
+ ((PasswordCallback)cb).Text = _session.Password;
+ }
+ }
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/SslOptions.cs b/qpid/dotnet/Qpid.Client/Client/SslOptions.cs
new file mode 100644
index 0000000000..4630121828
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/SslOptions.cs
@@ -0,0 +1,81 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Security.Cryptography.X509Certificates;
+namespace Apache.Qpid.Client
+ /// <summary>
+ /// Configures SSL-related options to connect to an AMQP broker.
+ /// </summary>
+ /// <remarks>
+ /// If the server certificate is not trusted by the client,
+ /// connection will fail. However, you can set the
+ /// <see cref="IgnoreValidationErrors"/> property to true
+ /// to ignore any certificate verification errors for debugging purposes.
+ /// </remarks>
+ public class SslOptions
+ {
+ private X509Certificate _clientCertificate;
+ private bool _ignoreValidationErrors;
+ /// <summary>
+ /// Certificate to present to the broker to authenticate
+ /// this client connection
+ /// </summary>
+ public X509Certificate ClientCertificate
+ {
+ get { return _clientCertificate; }
+ }
+ /// <summary>
+ /// If true, the validity of the broker certificate
+ /// will not be verified on connection
+ /// </summary>
+ public bool IgnoreValidationErrors
+ {
+ get { return _ignoreValidationErrors; }
+ }
+ /// <summary>
+ /// Initialize a new instance with default values
+ /// (No client certificate, don't ignore validation errors)
+ /// </summary>
+ public SslOptions()
+ {
+ }
+ /// <summary>
+ /// Initialize a new instance
+ /// </summary>
+ /// <param name="clientCertificate">
+ /// Certificate to use to authenticate the client to the broker
+ /// </param>
+ /// <param name="ignoreValidationErrors">
+ /// If true, ignore any validation errors when validating the server certificate
+ /// </param>
+ public SslOptions(X509Certificate clientCertificate, bool ignoreValidationErrors)
+ {
+ _clientCertificate = clientCertificate;
+ _ignoreValidationErrors = ignoreValidationErrors;
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/State/AMQState.cs b/qpid/dotnet/Qpid.Client/Client/State/AMQState.cs
new file mode 100644
index 0000000000..67f8427fb2
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/State/AMQState.cs
@@ -0,0 +1,35 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace Apache.Qpid.Client.State
+ public enum AMQState
+ {
+ ALL // all is a special state used in the state manager. It is not valid to be "in" the state "all".
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/State/AMQStateChangedEvent.cs b/qpid/dotnet/Qpid.Client/Client/State/AMQStateChangedEvent.cs
new file mode 100644
index 0000000000..a464bbb6f5
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/State/AMQStateChangedEvent.cs
@@ -0,0 +1,52 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace Apache.Qpid.Client.State
+ public class AMQStateChangedEvent
+ {
+ private readonly AMQState _oldState;
+ private readonly AMQState _newState;
+ public AMQStateChangedEvent(AMQState oldState, AMQState newState)
+ {
+ _oldState = oldState;
+ _newState = newState;
+ }
+ public AMQState OldState
+ {
+ get
+ {
+ return _oldState;
+ }
+ }
+ public AMQState NewState
+ {
+ get
+ {
+ return _newState;
+ }
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/State/AMQStateManager.cs b/qpid/dotnet/Qpid.Client/Client/State/AMQStateManager.cs
new file mode 100644
index 0000000000..881e01e697
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/State/AMQStateManager.cs
@@ -0,0 +1,251 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using log4net;
+using Apache.Qpid.Client.Handler;
+using Apache.Qpid.Client.Protocol;
+using Apache.Qpid.Client.Protocol.Listener;
+using Apache.Qpid.Framing;
+namespace Apache.Qpid.Client.State
+ public class AMQStateManager : IAMQMethodListener
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(AMQStateManager));
+ const bool InfoLoggingHack = true;
+ /// <summary>
+ /// The current state
+ /// </summary>
+ private AMQState _currentState;
+ /// <summary>
+ /// Maps from an AMQState instance to a Map from Class to StateTransitionHandler.
+ /// The class must be a subclass of AMQFrame.
+ /// </summary>
+ private readonly IDictionary _state2HandlersMap;
+ private ArrayList _stateListeners;
+ private object _syncLock;
+ public AMQStateManager()
+ {
+ _syncLock = new object();
+ _state2HandlersMap = new Hashtable();
+ _stateListeners = ArrayList.Synchronized(new ArrayList(5));
+ _currentState = AMQState.CONNECTION_NOT_STARTED;
+ RegisterListeners();
+ }
+ private void RegisterListeners()
+ {
+ IStateAwareMethodListener connectionStart = new ConnectionStartMethodHandler();
+ IStateAwareMethodListener connectionClose = new ConnectionCloseMethodHandler();
+ IStateAwareMethodListener connectionCloseOk = new ConnectionCloseOkHandler();
+ IStateAwareMethodListener connectionTune = new ConnectionTuneMethodHandler();
+ IStateAwareMethodListener connectionSecure = new ConnectionSecureMethodHandler();
+ IStateAwareMethodListener connectionOpenOk = new ConnectionOpenOkMethodHandler();
+ IStateAwareMethodListener channelClose = new ChannelCloseMethodHandler();
+ IStateAwareMethodListener basicDeliver = new BasicDeliverMethodHandler();
+ IStateAwareMethodListener basicReturn = new BasicReturnMethodHandler();
+ IStateAwareMethodListener queueDeleteOk = new QueueDeleteOkMethodHandler();
+ IStateAwareMethodListener queuePurgeOk = new QueuePurgeOkMethodHandler();
+ // We need to register a map for the null (i.e. all state) handlers otherwise you get
+ // a stack overflow in the handler searching code when you present it with a frame for which
+ // no handlers are registered.
+ _state2HandlersMap[AMQState.ALL] = new Hashtable();
+ {
+ Hashtable notStarted = new Hashtable();
+ notStarted[typeof(ConnectionStartBody)] = connectionStart;
+ notStarted[typeof(ConnectionCloseBody)] = connectionClose;
+ _state2HandlersMap[AMQState.CONNECTION_NOT_STARTED] = notStarted;
+ }
+ {
+ Hashtable notTuned = new Hashtable();
+ notTuned[typeof(ConnectionTuneBody)] = connectionTune;
+ notTuned[typeof(ConnectionSecureBody)] = connectionSecure;
+ notTuned[typeof(ConnectionCloseBody)] = connectionClose;
+ _state2HandlersMap[AMQState.CONNECTION_NOT_TUNED] = notTuned;
+ }
+ {
+ Hashtable notOpened = new Hashtable();
+ notOpened[typeof(ConnectionOpenOkBody)] = connectionOpenOk;
+ notOpened[typeof(ConnectionCloseBody)] = connectionClose;
+ _state2HandlersMap[AMQState.CONNECTION_NOT_OPENED] = notOpened;
+ }
+ {
+ Hashtable open = new Hashtable();
+ open[typeof(ChannelCloseBody)] = channelClose;
+ open[typeof(ConnectionCloseBody)] = connectionClose;
+ open[typeof(BasicDeliverBody)] = basicDeliver;
+ open[typeof(BasicReturnBody)] = basicReturn;
+ open[typeof(QueueDeleteOkBody)] = queueDeleteOk;
+ open[typeof(QueuePurgeOkBody)] = queuePurgeOk;
+ _state2HandlersMap[AMQState.CONNECTION_OPEN] = open;
+ }
+ {
+ Hashtable closing = new Hashtable();
+ closing[typeof(ConnectionCloseOkBody)] = connectionCloseOk;
+ _state2HandlersMap[AMQState.CONNECTION_CLOSING] = closing;
+ }
+ }
+ public AMQState CurrentState
+ {
+ get
+ {
+ return _currentState;
+ }
+ }
+ /// <summary>
+ /// Changes the state.
+ /// </summary>
+ /// <param name="newState">The new state.</param>
+ /// <exception cref="AMQException">if there is an error changing state</exception>
+ public void ChangeState(AMQState newState)
+ {
+ if (InfoLoggingHack)
+ {
+ _logger.Debug("State changing to " + newState + " from old state " + _currentState);
+ }
+ _logger.Debug("State changing to " + newState + " from old state " + _currentState);
+ AMQState oldState = _currentState;
+ _currentState = newState;
+ lock ( _syncLock )
+ {
+ foreach ( IStateListener l in _stateListeners )
+ {
+ l.StateChanged(oldState, newState);
+ }
+ }
+ }
+ public void Error(Exception e)
+ {
+ _logger.Debug("State manager receive error notification: " + e);
+ lock ( _syncLock )
+ {
+ foreach ( IStateListener l in _stateListeners )
+ {
+ l.Error(e);
+ }
+ }
+ }
+ public bool MethodReceived(AMQMethodEvent evt)
+ {
+ _logger.Debug(String.Format("Finding method handler. currentState={0} type={1}", _currentState, evt.Method.GetType()));
+ IStateAwareMethodListener handler = FindStateTransitionHandler(_currentState, evt.Method);
+ if (handler != null)
+ {
+ handler.MethodReceived(this, evt);
+ return true;
+ }
+ return false;
+ }
+ /// <summary>
+ /// Finds the state transition handler.
+ /// </summary>
+ /// <param name="currentState">State of the current.</param>
+ /// <param name="frame">The frame.</param>
+ /// <returns></returns>
+ /// <exception cref="IllegalStateTransitionException">if the state transition if not allowed</exception>
+ private IStateAwareMethodListener FindStateTransitionHandler(AMQState currentState,
+ AMQMethodBody frame)
+ {
+ Type clazz = frame.GetType();
+ if (_logger.IsDebugEnabled)
+ {
+ _logger.Debug("Looking for state transition handler for frame " + clazz);
+ }
+ IDictionary classToHandlerMap = (IDictionary) _state2HandlersMap[currentState];
+ if (classToHandlerMap == null)
+ {
+ // if no specialised per state handler is registered look for a
+ // handler registered for "all" states
+ return FindStateTransitionHandler(AMQState.ALL, frame);
+ }
+ IStateAwareMethodListener handler = (IStateAwareMethodListener) classToHandlerMap[clazz];
+ if (handler == null)
+ {
+ if (currentState == AMQState.ALL)
+ {
+ _logger.Debug("No state transition handler defined for receiving frame " + frame);
+ return null;
+ }
+ else
+ {
+ // if no specialised per state handler is registered look for a
+ // handler registered for "all" states
+ return FindStateTransitionHandler(AMQState.ALL, frame);
+ }
+ }
+ else
+ {
+ return handler;
+ }
+ }
+ public void AddStateListener(IStateListener listener)
+ {
+ _logger.Debug("Adding state listener");
+ lock ( _syncLock )
+ {
+ _stateListeners.Add(listener);
+ }
+ }
+ public void RemoveStateListener(IStateListener listener)
+ {
+ lock ( _syncLock )
+ {
+ _stateListeners.Remove(listener);
+ }
+ }
+ public void AttainState(AMQState s)
+ {
+ if (_currentState != s)
+ {
+ StateWaiter sw = null;
+ try
+ {
+ _logger.Debug("Adding state wait to reach state " + s);
+ sw = new StateWaiter(s);
+ AddStateListener(sw);
+ sw.WaituntilStateHasChanged();
+ // at this point the state will have changed.
+ }
+ finally
+ {
+ RemoveStateListener(sw);
+ }
+ }
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/State/IAMQStateListener.cs b/qpid/dotnet/Qpid.Client/Client/State/IAMQStateListener.cs
new file mode 100644
index 0000000000..31e4b5046d
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/State/IAMQStateListener.cs
@@ -0,0 +1,29 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace Apache.Qpid.Client.State
+ public interface IAMQStateListener
+ {
+ void StateChanged(AMQStateChangedEvent evt);
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/State/IStateAwareMethodListener.cs b/qpid/dotnet/Qpid.Client/Client/State/IStateAwareMethodListener.cs
new file mode 100644
index 0000000000..0874f39665
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/State/IStateAwareMethodListener.cs
@@ -0,0 +1,31 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using Apache.Qpid.Client.Protocol;
+namespace Apache.Qpid.Client.State
+ public interface IStateAwareMethodListener
+ {
+ void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt);
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/State/IStateListener.cs b/qpid/dotnet/Qpid.Client/Client/State/IStateListener.cs
new file mode 100644
index 0000000000..edd7382f93
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/State/IStateListener.cs
@@ -0,0 +1,33 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+namespace Apache.Qpid.Client.State
+ public interface IStateListener
+ {
+ void StateChanged(AMQState oldState, AMQState newState);
+ void Error(Exception e);
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/State/IllegalStateTransitionException.cs b/qpid/dotnet/Qpid.Client/Client/State/IllegalStateTransitionException.cs
new file mode 100644
index 0000000000..81de622617
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/State/IllegalStateTransitionException.cs
@@ -0,0 +1,74 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Runtime.Serialization;
+namespace Apache.Qpid.Client.State
+ [Serializable]
+ public class IllegalStateTransitionException : AMQException
+ {
+ private AMQState _originalState;
+ private Type _frame;
+ public IllegalStateTransitionException(AMQState originalState, Type frame)
+ : base("No valid state transition defined for receiving frame " + frame +
+ " from state " + originalState)
+ {
+ _originalState = originalState;
+ _frame = frame;
+ }
+ protected IllegalStateTransitionException(SerializationInfo info, StreamingContext ctxt)
+ : base(info, ctxt)
+ {
+ _originalState = (AMQState)info.GetValue("OriginalState", typeof(AMQState));
+ _frame = (Type)info.GetValue("FrameType", typeof(Type));
+ }
+ public AMQState OriginalState
+ {
+ get
+ {
+ return _originalState;
+ }
+ }
+ public Type FrameType
+ {
+ get
+ {
+ return _frame;
+ }
+ }
+ public override void GetObjectData(SerializationInfo info, StreamingContext context)
+ {
+ base.GetObjectData(info, context);
+ info.AddValue("OriginalState", OriginalState);
+ info.AddValue("FrameType", FrameType);
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/State/StateWaiter.cs b/qpid/dotnet/Qpid.Client/Client/State/StateWaiter.cs
new file mode 100644
index 0000000000..e739d0cb44
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/State/StateWaiter.cs
@@ -0,0 +1,121 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+using Apache.Qpid.Client.Protocol;
+using log4net;
+namespace Apache.Qpid.Client.State
+ public class StateWaiter : IStateListener
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(StateWaiter));
+ private readonly AMQState _state;
+ private AMQState _newState;
+ private volatile bool _newStateAchieved;
+ private volatile Exception _exception;
+ private ManualResetEvent _resetEvent = new ManualResetEvent(false);
+ public StateWaiter(AMQState state)
+ {
+ _state = state;
+ }
+ public void StateChanged(AMQState oldState, AMQState newState)
+ {
+ _newState = newState;
+ if (_logger.IsDebugEnabled)
+ {
+ _logger.Debug("stateChanged called");
+ }
+ if (_state == newState)
+ {
+ _newStateAchieved = true;
+ if (_logger.IsDebugEnabled)
+ {
+ _logger.Debug("New state reached so notifying monitor");
+ }
+ _resetEvent.Set();
+ }
+ }
+ public void Error(Exception e)
+ {
+ if (_logger.IsDebugEnabled)
+ {
+ _logger.Debug("exceptionThrown called");
+ }
+ _exception = e;
+ _resetEvent.Set();
+ }
+ public void WaituntilStateHasChanged()
+ {
+ //
+ // The guard is required in case we are woken up by a spurious
+ // notify().
+ //
+ TimeSpan waitTime = TimeSpan.FromMilliseconds(DefaultTimeouts.MaxWaitForState);
+ DateTime waitUntilTime = DateTime.Now + waitTime;
+ while ( !_newStateAchieved
+ && _exception == null
+ && waitTime.TotalMilliseconds > 0 )
+ {
+ _logger.Debug("State not achieved so waiting...");
+ try
+ {
+ _resetEvent.WaitOne(waitTime, true);
+ }
+ finally
+ {
+ if (!_newStateAchieved)
+ {
+ waitTime = waitUntilTime - DateTime.Now;
+ }
+ }
+ }
+ if (_exception != null)
+ {
+ _logger.Debug("Throwable reached state waiter: " + _exception);
+ if (_exception is AMQException)
+ throw _exception;
+ else
+ throw new AMQException("Error: " + _exception, _exception);
+ }
+ if (!_newStateAchieved)
+ {
+ string error = string.Format("State not achieved within permitted time. Current state: {0}, desired state: {1}", _state, _newState);
+ _logger.Warn(error);
+ throw new AMQException(error);
+ }
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/AMQProtocolProvider.cs b/qpid/dotnet/Qpid.Client/Client/Transport/AMQProtocolProvider.cs
new file mode 100644
index 0000000000..dd0bb404cb
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Transport/AMQProtocolProvider.cs
@@ -0,0 +1,47 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using Apache.Qpid.Codec;
+using Apache.Qpid.Codec.Demux;
+using Apache.Qpid.Framing;
+namespace Apache.Qpid.Client.Transport
+ public class AMQProtocolProvider
+ {
+ private DemuxingProtocolCodecFactory _factory;
+ public AMQProtocolProvider()
+ {
+ _factory = new DemuxingProtocolCodecFactory();
+ _factory.Register(new AMQDataBlockEncoder());
+ _factory.Register(new AMQDataBlockDecoder());
+ _factory.Register(new ProtocolInitiation.Decoder());
+ }
+ public IProtocolCodecFactory CodecFactory
+ {
+ get
+ {
+ return _factory;
+ }
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs b/qpid/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs
new file mode 100644
index 0000000000..1e217e755b
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs
@@ -0,0 +1,111 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using log4net;
+using Apache.Qpid.Buffer;
+using Apache.Qpid.Codec;
+using Apache.Qpid.Codec.Support;
+using Apache.Qpid.Framing;
+namespace Apache.Qpid.Client.Transport
+ public class AmqpChannel : IProtocolChannel
+ {
+ // Warning: don't use this log for regular logging.
+ static readonly ILog _protocolTraceLog = LogManager.GetLogger("TRACE.Qpid.Client.ProtocolChannel");
+ IByteChannel _byteChannel;
+ IProtocolEncoder _encoder;
+ IProtocolDecoder _decoder;
+ IProtocolDecoderOutput _decoderOutput;
+ private object _syncLock;
+ public AmqpChannel(IByteChannel byteChannel, IProtocolDecoderOutput decoderOutput)
+ {
+ _byteChannel = byteChannel;
+ _decoderOutput = decoderOutput;
+ _syncLock = new object();
+ AMQProtocolProvider protocolProvider = new AMQProtocolProvider();
+ IProtocolCodecFactory factory = protocolProvider.CodecFactory;
+ _encoder = factory.Encoder;
+ _decoder = factory.Decoder;
+ }
+ public void Read()
+ {
+ ByteBuffer buffer = _byteChannel.Read();
+ Decode(buffer);
+ }
+ public IAsyncResult BeginRead(AsyncCallback callback, object state)
+ {
+ return _byteChannel.BeginRead(callback, state);
+ }
+ public void EndRead(IAsyncResult result)
+ {
+ ByteBuffer buffer = _byteChannel.EndRead(result);
+ Decode(buffer);
+ }
+ public void Write(IDataBlock o)
+ {
+ // TODO: Refactor to decorator.
+ if (_protocolTraceLog.IsDebugEnabled)
+ {
+ _protocolTraceLog.Debug(String.Format("WRITE {0}", o));
+ }
+ // we should be doing an async write, but apparently
+ // the mentalis library doesn't queue async read/writes
+ // correctly and throws random IOException's. Stay sync for a while
+ //_byteChannel.BeginWrite(Encode(o), OnAsyncWriteDone, null);
+ _byteChannel.Write(Encode(o));
+ }
+ // not used for now
+ //private void OnAsyncWriteDone(IAsyncResult result)
+ //{
+ // _byteChannel.EndWrite(result);
+ //}
+ private void Decode(ByteBuffer buffer)
+ {
+ // make sure we don't try to decode more than
+ // one buffer at the same time
+ lock ( _syncLock )
+ {
+ _decoder.Decode(buffer, _decoderOutput);
+ }
+ }
+ private ByteBuffer Encode(object o)
+ {
+ SingleProtocolEncoderOutput output = new SingleProtocolEncoderOutput();
+ _encoder.Encode(o, output);
+ return output.buffer;
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/IByteChannel.cs b/qpid/dotnet/Qpid.Client/Client/Transport/IByteChannel.cs
new file mode 100644
index 0000000000..35806f2a6e
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Transport/IByteChannel.cs
@@ -0,0 +1,71 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using Apache.Qpid.Buffer;
+namespace Apache.Qpid.Client.Transport
+ /// <summary>
+ /// Represents input/output channels that read
+ /// and write <see cref="ByteBuffer"/> instances
+ /// </summary>
+ public interface IByteChannel
+ {
+ /// <summary>
+ /// Read a <see cref="ByteBuffer"/> from the underlying
+ /// network stream and any configured filters
+ /// </summary>
+ /// <returns>A ByteBuffer, if available</returns>
+ ByteBuffer Read();
+ /// <summary>
+ /// Begin an asynchronous read operation
+ /// </summary>
+ /// <param name="callback">Callback method to call when read operation completes</param>
+ /// <param name="state">State object</param>
+ /// <returns>An <see cref="System.IAsyncResult"/> object</returns>
+ IAsyncResult BeginRead(AsyncCallback callback, object state);
+ /// <summary>
+ /// End an asynchronous read operation
+ /// </summary>
+ /// <param name="result">The <see cref="System.IAsyncResult"/> object returned from <see cref="BeginRead"/></param>
+ /// <returns>The <see cref="ByteBuffer"/> read</returns>
+ ByteBuffer EndRead(IAsyncResult result);
+ /// <summary>
+ /// Write a <see cref="ByteBuffer"/> to the underlying network
+ /// stream, going through any configured filters
+ /// </summary>
+ /// <param name="buffer"></param>
+ void Write(ByteBuffer buffer);
+ /// <summary>
+ /// Begin an asynchronous write operation
+ /// </summary>
+ /// <param name="buffer">Buffer to write</param>
+ /// <param name="callback">A callback to call when the operation completes</param>
+ /// <param name="state">State object</param>
+ /// <returns>An <see cref="System.IAsyncResult"/> object</returns>
+ IAsyncResult BeginWrite(ByteBuffer buffer, AsyncCallback callback, object state);
+ /// <summary>
+ /// End an asynchronous write operation
+ /// </summary>
+ /// <param name="result">The <see cref="System.IAsyncResult"/> object returned by <see cref="BeginWrite"/></param>
+ void EndWrite(IAsyncResult result);
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs b/qpid/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs
new file mode 100644
index 0000000000..0b59ee8799
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs
@@ -0,0 +1,32 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+namespace Apache.Qpid.Client.Transport
+ public interface IProtocolChannel : IProtocolWriter
+ {
+ void Read();
+ IAsyncResult BeginRead(AsyncCallback callback, object state);
+ void EndRead(IAsyncResult result);
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/IProtocolWriter.cs b/qpid/dotnet/Qpid.Client/Client/Transport/IProtocolWriter.cs
new file mode 100644
index 0000000000..592dff3a19
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Transport/IProtocolWriter.cs
@@ -0,0 +1,29 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using Apache.Qpid.Framing;
+namespace Apache.Qpid.Client.Transport
+ public interface IProtocolWriter
+ {
+ void Write(IDataBlock o);
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/IStreamFilter.cs b/qpid/dotnet/Qpid.Client/Client/Transport/IStreamFilter.cs
new file mode 100644
index 0000000000..e0e890fc5a
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Transport/IStreamFilter.cs
@@ -0,0 +1,38 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System.IO;
+namespace Apache.Qpid.Client.Transport
+ /// <summary>
+ /// Defines a way to introduce an arbitrary filtering
+ /// stream into the stream chain managed by <see cref="IoHandler"/>
+ /// </summary>
+ public interface IStreamFilter
+ {
+ /// <summary>
+ /// Creates a new filtering stream on top of another
+ /// </summary>
+ /// <param name="lowerStream">Next stream on the stack</param>
+ /// <returns>A new filtering stream</returns>
+ Stream CreateFilterStream(Stream lowerStream);
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/ITransport.cs b/qpid/dotnet/Qpid.Client/Client/Transport/ITransport.cs
new file mode 100644
index 0000000000..693a9a9534
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Transport/ITransport.cs
@@ -0,0 +1,32 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using Apache.Qpid.Client.Qms;
+using Apache.Qpid.Client.Protocol;
+namespace Apache.Qpid.Client.Transport
+ public interface ITransport : IConnectionCloser
+ {
+ void Connect(IBrokerInfo broker, AMQConnection connection);
+ string LocalEndpoint { get; }
+ IProtocolWriter ProtocolWriter { get; }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/IoHandler.cs b/qpid/dotnet/Qpid.Client/Client/Transport/IoHandler.cs
new file mode 100644
index 0000000000..0475236d92
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Transport/IoHandler.cs
@@ -0,0 +1,322 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.IO;
+using System.Threading;
+using log4net;
+using Apache.Qpid.Buffer;
+using Apache.Qpid.Client.Protocol;
+namespace Apache.Qpid.Client.Transport
+ /// <summary>
+ /// Responsible for reading and writing
+ /// ByteBuffers from/to network streams, and handling
+ /// the stream filters
+ /// </summary>
+ public class IoHandler : IByteChannel, IDisposable
+ {
+ private static readonly ILog _log = LogManager.GetLogger(typeof(IoHandler));
+ private const int DEFAULT_BUFFER_SIZE = 32 * 1024;
+ private Stream _topStream;
+ private IProtocolListener _protocolListener;
+ private int _readBufferSize;
+ public int ReadBufferSize
+ {
+ get { return _readBufferSize; }
+ set { _readBufferSize = value; }
+ }
+ /// <summary>
+ /// Initialize a new instance
+ /// </summary>
+ /// <param name="stream">Underlying network stream</param>
+ /// <param name="protocolListener">Protocol listener to report exceptions to</param>
+ public IoHandler(Stream stream, IProtocolListener protocolListener)
+ {
+ if ( stream == null )
+ throw new ArgumentNullException("stream");
+ if ( protocolListener == null )
+ throw new ArgumentNullException("protocolListener");
+ // initially, the stream at the top of the filter
+ // chain is the underlying network stream
+ _topStream = stream;
+ _protocolListener = protocolListener;
+ _readBufferSize = DEFAULT_BUFFER_SIZE;
+ }
+ /// <summary>
+ /// Adds a new filter on the top of the chain
+ /// </summary>
+ /// <param name="filter">Stream filter to put on top of the chain</param>
+ /// <remarks>
+ /// This should *only* be called during initialization. We don't
+ /// support changing the filter change after the first read/write
+ /// has been done and it's not thread-safe to boot!
+ /// </remarks>
+ public void AddFilter(IStreamFilter filter)
+ {
+ _topStream = filter.CreateFilterStream(_topStream);
+ }
+ #region IByteChannel Implementation
+ //
+ // IByteChannel Implementation
+ //
+ /// <summary>
+ /// Read a <see cref="ByteBuffer"/> from the underlying
+ /// network stream and any configured filters
+ /// </summary>
+ /// <returns>A ByteBuffer, if available</returns>
+ public ByteBuffer Read()
+ {
+ byte[] bytes = AllocateBuffer();
+ int numOctets = _topStream.Read(bytes, 0, bytes.Length);
+ return WrapByteArray(bytes, numOctets);
+ }
+ /// <summary>
+ /// Begin an asynchronous read operation
+ /// </summary>
+ /// <param name="callback">Callback method to call when read operation completes</param>
+ /// <param name="state">State object</param>
+ /// <returns>An <see cref="System.IAsyncResult"/> object</returns>
+ public IAsyncResult BeginRead(AsyncCallback callback, object state)
+ {
+ byte[] bytes = AllocateBuffer();
+ ReadData rd = new ReadData(callback, state, bytes);
+ // only put a callback if the caller wants one.
+ AsyncCallback myCallback = null;
+ if ( callback != null )
+ myCallback = new AsyncCallback(OnAsyncReadDone);
+ IAsyncResult result = _topStream.BeginRead(
+ bytes, 0, bytes.Length, myCallback,rd
+ );
+ return new WrappedAsyncResult(result, bytes);
+ }
+ /// <summary>
+ /// End an asynchronous read operation
+ /// </summary>
+ /// <param name="result">The <see cref="System.IAsyncResult"/> object returned from <see cref="BeginRead"/></param>
+ /// <returns>The <see cref="ByteBuffer"/> read</returns>
+ public ByteBuffer EndRead(IAsyncResult result)
+ {
+ WrappedAsyncResult theResult = (WrappedAsyncResult)result;
+ int bytesRead = _topStream.EndRead(theResult.InnerResult);
+ return WrapByteArray(theResult.Buffer, bytesRead);
+ }
+ /// <summary>
+ /// Write a <see cref="ByteBuffer"/> to the underlying network
+ /// stream, going through any configured filters
+ /// </summary>
+ /// <param name="buffer"></param>
+ public void Write(ByteBuffer buffer)
+ {
+ try
+ {
+ _topStream.Write(buffer.Array, buffer.Position, buffer.Limit); // FIXME
+ }
+ catch (Exception e)
+ {
+ _log.Warn("Write caused exception", e);
+ _protocolListener.OnException(e);
+ }
+ }
+ /// <summary>
+ /// Begin an asynchronous write operation
+ /// </summary>
+ /// <param name="buffer">Buffer to write</param>
+ /// <param name="callback">A callback to call when the operation completes</param>
+ /// <param name="state">State object</param>
+ /// <returns>An <see cref="System.IAsyncResult"/> object</returns>
+ public IAsyncResult BeginWrite(ByteBuffer buffer, AsyncCallback callback, object state)
+ {
+ try
+ {
+ return _topStream.BeginWrite(
+ buffer.Array, buffer.Position, buffer.Limit,
+ callback, state
+ );
+ } catch ( Exception e )
+ {
+ _log.Error("BeginWrite caused exception", e);
+ // not clear if an exception here should be propagated? we still
+ // need to propagate it upwards anyway!
+ _protocolListener.OnException(e);
+ throw;
+ }
+ }
+ /// <summary>
+ /// End an asynchronous write operation
+ /// </summary>
+ /// <param name="result">The <see cref="System.IAsyncResult"/> object returned by <see cref="BeginWrite"/></param>
+ public void EndWrite(IAsyncResult result)
+ {
+ try
+ {
+ _topStream.EndWrite(result);
+ } catch ( Exception e )
+ {
+ _log.Error("EndWrite caused exception", e);
+ // not clear if an exception here should be propagated?
+ _protocolListener.OnException(e);
+ //throw;
+ }
+ }
+ #endregion // IByteChannel Implementation
+ #region IDisposable Implementation
+ //
+ // IDisposable Implementation
+ //
+ public void Dispose()
+ {
+ if ( _topStream != null )
+ {
+ _topStream.Close();
+ }
+ }
+ #endregion // IDisposable Implementation
+ #region Private and Helper Classes/Methods
+ //
+ // Private and Helper Classes/Methods
+ //
+ private byte[] AllocateBuffer()
+ {
+ return new byte[ReadBufferSize];
+ }
+ private static ByteBuffer WrapByteArray(byte[] bytes, int size)
+ {
+ ByteBuffer byteBuffer = ByteBuffer.Wrap(bytes);
+ byteBuffer.Limit = size;
+ byteBuffer.Flip();
+ return byteBuffer;
+ }
+ private static void OnAsyncReadDone(IAsyncResult result)
+ {
+ ReadData rd = (ReadData) result.AsyncState;
+ IAsyncResult wrapped = new WrappedAsyncResult(result, rd.Buffer);
+ rd.Callback(wrapped);
+ }
+ class ReadData
+ {
+ private object _state;
+ private AsyncCallback _callback;
+ private byte[] _buffer;
+ public object State
+ {
+ get { return _state; }
+ }
+ public AsyncCallback Callback
+ {
+ get { return _callback; }
+ }
+ public byte[] Buffer
+ {
+ get { return _buffer; }
+ }
+ public ReadData(AsyncCallback callback, object state, byte[] buffer)
+ {
+ _callback = callback;
+ _state = state;
+ _buffer = buffer;
+ }
+ }
+ class WrappedAsyncResult : IAsyncResult
+ {
+ private IAsyncResult _innerResult;
+ private byte[] _buffer;
+ #region IAsyncResult Properties
+ //
+ // IAsyncResult Properties
+ //
+ public bool IsCompleted
+ {
+ get { return _innerResult.IsCompleted; }
+ }
+ public WaitHandle AsyncWaitHandle
+ {
+ get { return _innerResult.AsyncWaitHandle; }
+ }
+ public object AsyncState
+ {
+ get { return _innerResult.AsyncState; }
+ }
+ public bool CompletedSynchronously
+ {
+ get { return _innerResult.CompletedSynchronously; }
+ }
+ #endregion // IAsyncResult Properties
+ public IAsyncResult InnerResult
+ {
+ get { return _innerResult; }
+ }
+ public byte[] Buffer
+ {
+ get { return _buffer; }
+ }
+ public WrappedAsyncResult(IAsyncResult result, byte[] buffer)
+ {
+ if ( result == null )
+ throw new ArgumentNullException("result");
+ if ( buffer == null )
+ throw new ArgumentNullException("buffer");
+ _innerResult = result;
+ _buffer = buffer;
+ }
+ }
+ #endregion // Private and Helper Classes/Methods
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs b/qpid/dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs
new file mode 100644
index 0000000000..9fa313152f
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs
@@ -0,0 +1,60 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+using Apache.Qpid.Client.Protocol;
+using Apache.Qpid.Codec;
+using Apache.Qpid.Framing;
+using log4net;
+namespace Apache.Qpid.Client.Transport
+ /// <summary>
+ /// <see cref="IProtocolDecoderOutput"/> implementation that forwards
+ /// each <see cref="IDataBlock"/> as it is decoded to the
+ /// protocol listener
+ /// </summary>
+ internal class ProtocolDecoderOutput : IProtocolDecoderOutput
+ {
+ private IProtocolListener _protocolListener;
+ static readonly ILog _protocolTraceLog = LogManager.GetLogger("TRACE.Qpid.Client.ProtocolChannel");
+ public ProtocolDecoderOutput(IProtocolListener protocolListener)
+ {
+ if ( protocolListener == null )
+ throw new ArgumentNullException("protocolListener");
+ _protocolListener = protocolListener;
+ }
+ public void Write(object message)
+ {
+ IDataBlock block = message as IDataBlock;
+ if ( block != null )
+ {
+ _protocolTraceLog.Debug(String.Format("READ {0}", block));
+ _protocolListener.OnMessage(block);
+ }
+ }
+ }
+} // namespace Apache.Qpid.Client.Transport
diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/SingleProtocolEncoderOutput.cs b/qpid/dotnet/Qpid.Client/Client/Transport/SingleProtocolEncoderOutput.cs
new file mode 100644
index 0000000000..a1aa889ba0
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Transport/SingleProtocolEncoderOutput.cs
@@ -0,0 +1,40 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using Apache.Qpid.Buffer;
+using Apache.Qpid.Codec;
+namespace Apache.Qpid.Client.Transport
+ public class SingleProtocolEncoderOutput : IProtocolEncoderOutput
+ {
+ public ByteBuffer buffer;
+ public void Write(ByteBuffer buf)
+ {
+ if (buffer != null)
+ {
+ throw new InvalidOperationException("{0} does not allow the writing of more than one buffer");
+ }
+ buffer = buf;
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs b/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs
new file mode 100644
index 0000000000..f336d8a80a
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs
@@ -0,0 +1,150 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.IO;
+using System.Threading;
+using Apache.Qpid.Client.Qms;
+using Apache.Qpid.Client.Protocol;
+using Apache.Qpid.Codec;
+using Apache.Qpid.Framing;
+namespace Apache.Qpid.Client.Transport.Socket.Blocking
+ /// <summary>
+ /// TCP Socket transport supporting both
+ /// SSL and non-SSL connections.
+ /// </summary>
+ public class BlockingSocketTransport : ITransport
+ {
+ // Configuration variables.
+ IProtocolListener _protocolListener;
+ // Runtime variables.
+ private ISocketConnector _connector;
+ private IoHandler _ioHandler;
+ private AmqpChannel _amqpChannel;
+ private ManualResetEvent _stopEvent;
+ public IProtocolWriter ProtocolWriter
+ {
+ get { return _amqpChannel; }
+ }
+ public string LocalEndpoint
+ {
+ get { return _connector.LocalEndpoint; }
+ }
+ /// <summary>
+ /// Connect to the specified broker
+ /// </summary>
+ /// <param name="broker">The broker to connect to</param>
+ /// <param name="connection">The AMQ connection</param>
+ public void Connect(IBrokerInfo broker, AMQConnection connection)
+ {
+ _stopEvent = new ManualResetEvent(false);
+ _protocolListener = connection.ProtocolListener;
+ _ioHandler = MakeBrokerConnection(broker, connection);
+ // todo: get default read size from config!
+ IProtocolDecoderOutput decoderOutput =
+ new ProtocolDecoderOutput(_protocolListener);
+ _amqpChannel =
+ new AmqpChannel(new ByteChannel(_ioHandler), decoderOutput);
+ // post an initial async read
+ _amqpChannel.BeginRead(new AsyncCallback(OnAsyncReadDone), this);
+ }
+ /// <summary>
+ /// Close the broker connection
+ /// </summary>
+ public void Close()
+ {
+ StopReading();
+ CloseBrokerConnection();
+ }
+ private void StopReading()
+ {
+ _stopEvent.Set();
+ }
+ private void CloseBrokerConnection()
+ {
+ if ( _ioHandler != null )
+ {
+ _ioHandler.Dispose();
+ _ioHandler = null;
+ }
+ if ( _connector != null )
+ {
+ _connector.Dispose();
+ _connector = null;
+ }
+ }
+ private IoHandler MakeBrokerConnection(IBrokerInfo broker, AMQConnection connection)
+ {
+ if ( broker.UseSSL )
+ {
+ _connector = new SslSocketConnector();
+ } else
+ {
+ _connector = new SocketConnector();
+ }
+ Stream stream = _connector.Connect(broker);
+ return new IoHandler(stream, connection.ProtocolListener);
+ }
+ private void OnAsyncReadDone(IAsyncResult result)
+ {
+ try
+ {
+ _amqpChannel.EndRead(result);
+ bool stopping = _stopEvent.WaitOne(0, false);
+ if ( !stopping )
+ _amqpChannel.BeginRead(new AsyncCallback(OnAsyncReadDone), null);
+ } catch ( Exception e )
+ {
+ // ignore any errors during closing
+ bool stopping = _stopEvent.WaitOne(0, false);
+ if ( !stopping )
+ _protocolListener.OnException(e);
+ }
+ }
+ #region IProtocolDecoderOutput Members
+ public void Write(object message)
+ {
+ _protocolListener.OnMessage((IDataBlock)message);
+ }
+ #endregion
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs b/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs
new file mode 100644
index 0000000000..4540f01f4e
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs
@@ -0,0 +1,92 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using log4net;
+using Apache.Qpid.Buffer;
+namespace Apache.Qpid.Client.Transport.Socket.Blocking
+ class ByteChannel : IByteChannel
+ {
+ // Warning: don't use this log for regular logging.
+ private static readonly ILog _ioTraceLog = LogManager.GetLogger("TRACE.Qpid.Client.ByteChannel");
+ private IByteChannel _lowerChannel;
+ public ByteChannel(IByteChannel lowerChannel)
+ {
+ _lowerChannel = lowerChannel;
+ }
+ public ByteBuffer Read()
+ {
+ ByteBuffer result = _lowerChannel.Read();
+ // TODO: Move into decorator.
+ if (_ioTraceLog.IsDebugEnabled)
+ {
+ _ioTraceLog.Debug(String.Format("READ {0}", result));
+ }
+ return result;
+ }
+ public IAsyncResult BeginRead(AsyncCallback callback, object state)
+ {
+ return _lowerChannel.BeginRead(callback, state);
+ }
+ public ByteBuffer EndRead(IAsyncResult result)
+ {
+ ByteBuffer buffer = _lowerChannel.EndRead(result);
+ if ( _ioTraceLog.IsDebugEnabled )
+ {
+ _ioTraceLog.Debug(String.Format("READ {0}", buffer));
+ }
+ return buffer;
+ }
+ public void Write(ByteBuffer buffer)
+ {
+ // TODO: Move into decorator.
+ if (_ioTraceLog.IsDebugEnabled)
+ {
+ _ioTraceLog.Debug(String.Format("WRITE {0}", buffer));
+ }
+ _lowerChannel.Write(buffer);
+ }
+ public IAsyncResult BeginWrite(ByteBuffer buffer, AsyncCallback callback, object state)
+ {
+ if ( _ioTraceLog.IsDebugEnabled )
+ {
+ _ioTraceLog.Debug(String.Format("WRITE {0}", buffer));
+ }
+ return _lowerChannel.BeginWrite(buffer, callback, state);
+ }
+ public void EndWrite(IAsyncResult result)
+ {
+ _lowerChannel.EndWrite(result);
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ISocketConnector.cs b/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ISocketConnector.cs
new file mode 100644
index 0000000000..137fa19c0d
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ISocketConnector.cs
@@ -0,0 +1,34 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.IO;
+using Apache.Qpid.Client.Qms;
+namespace Apache.Qpid.Client.Transport.Socket.Blocking
+ interface ISocketConnector : IDisposable
+ {
+ string LocalEndpoint { get; }
+ Stream Connect(IBrokerInfo broker);
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SocketConnector.cs b/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SocketConnector.cs
new file mode 100644
index 0000000000..b6dd8c3be1
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SocketConnector.cs
@@ -0,0 +1,71 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System.IO;
+using System.Net;
+using System.Net.Sockets;
+using Apache.Qpid.Client.Qms;
+namespace Apache.Qpid.Client.Transport.Socket.Blocking
+ /// <summary>
+ /// Implements a TCP connection over regular sockets.
+ /// </summary>
+ class SocketConnector : ISocketConnector
+ {
+ private MyTcpClient _tcpClient;
+ public string LocalEndpoint
+ {
+ get { return _tcpClient.LocalEndpoint.ToString(); }
+ }
+ public Stream Connect(IBrokerInfo broker)
+ {
+ _tcpClient = new MyTcpClient(broker.Host, broker.Port);
+ return _tcpClient.GetStream();
+ }
+ public void Dispose()
+ {
+ if ( _tcpClient != null )
+ {
+ _tcpClient.Close();
+ _tcpClient = null;
+ }
+ }
+ class MyTcpClient : TcpClient
+ {
+ public MyTcpClient(string host, int port)
+ : base(host, port)
+ {
+ }
+ public EndPoint LocalEndpoint
+ {
+ get { return Client.LocalEndPoint; }
+ }
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SslSocketConnector.cs b/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SslSocketConnector.cs
new file mode 100644
index 0000000000..8436e6fc4f
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SslSocketConnector.cs
@@ -0,0 +1,107 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System.IO;
+using System.Net;
+using log4net;
+using Apache.Qpid.Client.Qms;
+using Org.Mentalis.Security.Ssl;
+using MCertificate = Org.Mentalis.Security.Certificates.Certificate;
+using MCertificateChain = Org.Mentalis.Security.Certificates.CertificateChain;
+namespace Apache.Qpid.Client.Transport.Socket.Blocking
+ /// <summary>
+ /// Implements a TLS v1.0 connection using the library
+ /// </summary>
+ /// <remarks>
+ /// It would've been easier to implement this at the StreamFilter
+ /// level, but unfortunately the Mentalis library doesn't support
+ /// a passthrough SSL stream class and is tied directly
+ /// to socket-like classes.
+ /// </remarks>
+ class SslSocketConnector : ISocketConnector
+ {
+ private static ILog _logger = LogManager.GetLogger(typeof(SslSocketConnector));
+ private MyTcpClient _tcpClient;
+ public string LocalEndpoint
+ {
+ get { return _tcpClient.LocalEndpoint.ToString(); }
+ }
+ public Stream Connect(IBrokerInfo broker)
+ {
+ MCertificate cert = GetClientCert(broker);
+ SecurityOptions options = new SecurityOptions(
+ SecureProtocol.Tls1, cert, ConnectionEnd.Client
+ );
+ if ( broker.SslOptions != null
+ && broker.SslOptions.IgnoreValidationErrors )
+ {
+ _logger.Warn("Ignoring any certificate validation errors during SSL handshake...");
+ options.VerificationType = CredentialVerification.None;
+ }
+ _tcpClient = new MyTcpClient(broker.Host, broker.Port, options);
+ return _tcpClient.GetStream();
+ }
+ public void Dispose()
+ {
+ if ( _tcpClient != null )
+ {
+ _tcpClient.Close();
+ _tcpClient = null;
+ }
+ }
+ private static MCertificate GetClientCert(IBrokerInfo broker)
+ {
+ // if a client certificate is configured,
+ // use that to enable mutual authentication
+ MCertificate cert = null;
+ if ( broker.SslOptions != null
+ && broker.SslOptions.ClientCertificate != null )
+ {
+ cert = MCertificate.CreateFromX509Certificate(
+ broker.SslOptions.ClientCertificate
+ );
+ _logger.DebugFormat("Using Client Certificate for SSL '{0}'", cert.ToString(true));
+ }
+ return cert;
+ }
+ class MyTcpClient : SecureTcpClient
+ {
+ public MyTcpClient(string host, int port, SecurityOptions options)
+ : base(host, port, options)
+ {
+ }
+ public EndPoint LocalEndpoint
+ {
+ get { return Client.LocalEndPoint; }
+ }
+ }
+ }
diff --git a/qpid/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs b/qpid/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs
new file mode 100644
index 0000000000..a06de9eac8
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs
@@ -0,0 +1,98 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Text;
+using System.Threading;
+using Apache.Qpid.Collections;
+using Apache.Qpid.Common;
+namespace Apache.Qpid.Client.Util
+ internal delegate void ThresholdMethod(int currentCount);
+ /// <summary>
+ /// Basic bounded queue used to implement prefetching.
+ /// Notice we do the callbacks here asynchronously to
+ /// avoid adding more complexity to the channel impl.
+ /// </summary>
+ internal class FlowControlQueue
+ {
+ private BlockingQueue _queue = new LinkedBlockingQueue();
+ private int _itemCount;
+ private int _lowerBound;
+ private int _upperBound;
+ private ThresholdMethod _underThreshold;
+ private ThresholdMethod _overThreshold;
+ public FlowControlQueue(
+ int lowerBound,
+ int upperBound,
+ ThresholdMethod underThreshold,
+ ThresholdMethod overThreshold
+ )
+ {
+ _lowerBound = lowerBound;
+ _upperBound = upperBound;
+ _underThreshold = underThreshold;
+ _overThreshold = overThreshold;
+ }
+ public void Enqueue(object item)
+ {
+ _queue.EnqueueBlocking(item);
+ int count = Interlocked.Increment(ref _itemCount);
+ if ( _overThreshold != null )
+ {
+ if ( count == _upperBound )
+ {
+ _overThreshold.BeginInvoke(
+ count, new AsyncCallback(OnAsyncCallEnd),
+ _overThreshold
+ );
+ }
+ }
+ }
+ public object Dequeue()
+ {
+ object item = _queue.DequeueBlocking();
+ int count = Interlocked.Decrement(ref _itemCount);
+ if ( _underThreshold != null )
+ {
+ if ( count == _lowerBound )
+ {
+ _underThreshold.BeginInvoke(
+ count, new AsyncCallback(OnAsyncCallEnd),
+ _underThreshold
+ );
+ }
+ }
+ return item;
+ }
+ private void OnAsyncCallEnd(IAsyncResult res)
+ {
+ ThresholdMethod method = (ThresholdMethod)res.AsyncState;
+ method.EndInvoke(res);
+ }
+ }