diff options
| author | Rupert Smith <rupertlssmith@apache.org> | 2008-01-22 11:54:54 +0000 |
|---|---|---|
| committer | Rupert Smith <rupertlssmith@apache.org> | 2008-01-22 11:54:54 +0000 |
| commit | c076635c6b7e6494026ca398d124f95e66c13e06 (patch) | |
| tree | 8b787020357b497288a0c3bd0a19912c56027a32 /dotnet/Qpid.Client/Client | |
| parent | e8340832149070f8254eabee87276f2a415aecac (diff) | |
| download | qpid-python-c076635c6b7e6494026ca398d124f95e66c13e06.tar.gz | |
Qpid-730. Removed durable and subscription name from consumer builder, as they are not to do with consume but queue declaration. Durable subscribers must provide their own fixed queue names.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@614184 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'dotnet/Qpid.Client/Client')
| -rw-r--r-- | dotnet/Qpid.Client/Client/AmqChannel.cs | 1005 | ||||
| -rw-r--r-- | dotnet/Qpid.Client/Client/BasicMessageConsumer.cs | 5 | ||||
| -rw-r--r-- | dotnet/Qpid.Client/Client/Closeable.cs | 49 |
3 files changed, 618 insertions, 441 deletions
diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs index 84f08729dd..b5f6b6eec9 100644 --- a/dotnet/Qpid.Client/Client/AmqChannel.cs +++ b/dotnet/Qpid.Client/Client/AmqChannel.cs @@ -33,26 +33,43 @@ using Apache.Qpid.Protocol; namespace Apache.Qpid.Client { + /// <summary> + /// <p/><table id="crc"><caption>CRC Card</caption> + /// <tr><th> Responsibilities <th> Collaborations + /// <tr><td> Declare queues. + /// <tr><td> Declare exchanges. + /// <tr><td> Bind queues to exchanges. + /// <tr><td> Create messages. + /// <tr><td> Set up message consumers on the channel. + /// <tr><td> Set up message producers on the channel. + /// <tr><td> Commit the current transaction. + /// <tr><td> Roll-back the current transaction. + /// <tr><td> Close the channel. + /// </table> + /// </summary> public class AmqChannel : Closeable, IChannel { private static readonly ILog _logger = LogManager.GetLogger(typeof(AmqChannel)); internal const int BASIC_CONTENT_TYPE = 60; + public const int DEFAULT_PREFETCH_HIGH_MARK = 5000; + + public const int DEFAULT_PREFETCH_LOW_MARK = 2500; + private static int _nextSessionNumber = 0; + private AMQConnection _connection; + private int _sessionNumber; + private bool _suspended; + private object _suspensionLock = new object(); // Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature. private int _nextConsumerNumber = 1; - public const int DEFAULT_PREFETCH_HIGH_MARK = 5000; - public const int DEFAULT_PREFETCH_LOW_MARK = 2500; - - private AMQConnection _connection; - private bool _transacted; private AcknowledgeMode _acknowledgeMode; @@ -60,6 +77,7 @@ namespace Apache.Qpid.Client private ushort _channelId; private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK; + private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK; private FlowControlQueue _queue; @@ -68,14 +86,10 @@ namespace Apache.Qpid.Client private MessageFactoryRegistry _messageFactoryRegistry; - /// <summary> - /// Set of all producers created by this session - /// </summary> + /// <summary> Holds all of the producers created by this channel. </summary> private Hashtable _producers = Hashtable.Synchronized(new Hashtable()); - /// <summary> - /// Maps from consumer tag to JMSMessageConsumer instance - /// </summary> + /// <summary> Holds all of the consumers created by this channel. </summary> private Hashtable _consumers = Hashtable.Synchronized(new Hashtable()); private ArrayList _replayFrames = new ArrayList(); @@ -90,157 +104,253 @@ namespace Apache.Qpid.Client private long _nextProducerId; /// <summary> - /// Responsible for decoding a message fragment and passing it to the appropriate message consumer. + /// Initializes a new instance of the <see cref="AmqChannel"/> class. /// </summary> - private class Dispatcher + /// <param name="con">The connection.</param> + /// <param name="channelId">The channel id.</param> + /// <param name="transacted">if set to <c>true</c> [transacted].</param> + /// <param name="acknowledgeMode">The acknowledge mode.</param> + /// <param name="defaultPrefetchHigh">Default prefetch high value</param> + /// <param name="defaultPrefetchLow">Default prefetch low value</param> + internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode, + int defaultPrefetchHigh, int defaultPrefetchLow) + : this() { - private int _stopped = 0; - - private AmqChannel _containingChannel; + _sessionNumber = Interlocked.Increment(ref _nextSessionNumber); + _connection = con; + _transacted = transacted; - public Dispatcher(AmqChannel containingChannel) + if ( transacted ) { - _containingChannel = containingChannel; + _acknowledgeMode = AcknowledgeMode.SessionTransacted; + } + else + { + _acknowledgeMode = acknowledgeMode; } - - /// <summary> - /// Runs the dispatcher. This is intended to be Run in a separate thread. - /// </summary> - public void RunDispatcher() + + _channelId = channelId; + _defaultPrefetchHighMark = defaultPrefetchHigh; + _defaultPrefetchLowMark = defaultPrefetchLow; + + if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge ) { - UnprocessedMessage message; + _queue = new FlowControlQueue(_defaultPrefetchLowMark, _defaultPrefetchHighMark, + new ThresholdMethod(OnPrefetchLowMark), + new ThresholdMethod(OnPrefetchHighMark)); + } + else + { + // low and upper are the same + _queue = new FlowControlQueue(_defaultPrefetchHighMark, _defaultPrefetchHighMark, + null, null); + } + } - while (_stopped == 0 && (message = (UnprocessedMessage)_containingChannel._queue.Dequeue()) != null) - { - //_queue.size() - DispatchMessage(message); - } + private AmqChannel() + { + _messageFactoryRegistry = MessageFactoryRegistry.NewDefaultRegistry(); + } - _logger.Debug("Dispatcher thread terminating for channel " + _containingChannel._channelId); + /// <summary> + /// Acknowledge mode for messages received. + /// </summary> + public AcknowledgeMode AcknowledgeMode + { + get + { + CheckNotClosed(); + return _acknowledgeMode; } + } - private void DispatchMessage(UnprocessedMessage message) + /// <summary> + /// True if the channel should use transactions. + /// </summary> + public bool Transacted + { + get { - if (message.DeliverBody != null) - { - BasicMessageConsumer consumer = (BasicMessageConsumer) _containingChannel._consumers[message.DeliverBody.ConsumerTag]; + CheckNotClosed(); + return _transacted; + } + } - if (consumer == null) - { - _logger.Warn("Received a message from queue " + message.DeliverBody.ConsumerTag + " without a f - ignoring..."); - } - else - { - consumer.NotifyMessage(message, _containingChannel.ChannelId); - } - } - else - { - try - { - // Bounced message is processed here, away from the mina thread - AbstractQmsMessage bouncedMessage = _containingChannel._messageFactoryRegistry. - CreateMessage(0, false, message.ContentHeader, message.Bodies); + /// <summary> + /// Prefetch value to be used as the default for + /// consumers created on this channel. + /// </summary> + public int DefaultPrefetch + { + get { return DefaultPrefetchHigh; } + } - int errorCode = message.BounceBody.ReplyCode; - string reason = message.BounceBody.ReplyText; - _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")"); + /// <summary> + /// Prefetch low value to be used as the default for + /// consumers created on this channel. + /// </summary> + public int DefaultPrefetchLow + { + get { return _defaultPrefetchLowMark; } + } - _containingChannel._connection.ExceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage)); - } - catch (Exception e) - { - _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e); - } - } - } + /// <summary> + /// Prefetch high value to be used as the default for + /// consumers created on this channel. + /// </summary> + public int DefaultPrefetchHigh + { + get { return _defaultPrefetchHighMark; } + } - public void StopDispatcher() - { - Interlocked.Exchange(ref _stopped, 1); - } + /// <summary> Indicates whether or not this channel is currently suspended. </summary> + public bool IsSuspended + { + get { return _suspended; } + } + + /// <summary> Provides the channels number within the the connection. </summary> + public ushort ChannelId + { + get { return _channelId; } + } + + /// <summary> Provides the connection that this channel runs over. </summary> + public AMQConnection Connection + { + get { return _connection; } } /// <summary> - /// Initializes a new instance of the <see cref="AmqChannel"/> class. + /// Declare a new exchange. /// </summary> - /// <param name="con">The connection.</param> - /// <param name="channelId">The channel id.</param> - /// <param name="transacted">if set to <c>true</c> [transacted].</param> - /// <param name="acknowledgeMode">The acknowledge mode.</param> - /// <param name="defaultPrefetchHigh">Default prefetch high value</param> - /// <param name="defaultPrefetchLow">Default prefetch low value</param> - internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow) - : this() - { - _sessionNumber = Interlocked.Increment(ref _nextSessionNumber); - _connection = con; - _transacted = transacted; - if ( transacted ) - { - _acknowledgeMode = AcknowledgeMode.SessionTransacted; - } else - { - _acknowledgeMode = acknowledgeMode; - } - _channelId = channelId; - _defaultPrefetchHighMark = defaultPrefetchHigh; - _defaultPrefetchLowMark = defaultPrefetchLow; - - if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge ) - { - _queue = new FlowControlQueue( - _defaultPrefetchLowMark, _defaultPrefetchHighMark, - new ThresholdMethod(OnPrefetchLowMark), - new ThresholdMethod(OnPrefetchHighMark) - ); - } else - { - // low and upper are the same - _queue = new FlowControlQueue( - _defaultPrefetchHighMark, _defaultPrefetchHighMark, - null, null - ); - } + /// <param name="exchangeName">Name of the exchange</param> + /// <param name="exchangeClass">Class of the exchange, from <see cref="ExchangeClassConstants"/></param> + public void DeclareExchange(String exchangeName, String exchangeClass) + { + _logger.Debug(string.Format("DeclareExchange vame={0} exchangeClass={1}", exchangeName, exchangeClass)); + + DeclareExchange(_channelId, 0, exchangeName, exchangeClass, false, false, false, false, true, null); } - private AmqChannel() + /// <summary> + /// Declare a new exchange using the default exchange class. + /// </summary> + /// <param name="exchangeName">Name of the exchange</param> + public void DeleteExchange(string exchangeName) { - _messageFactoryRegistry = MessageFactoryRegistry.NewDefaultRegistry(); + throw new NotImplementedException(); } /// <summary> - /// Create a disconnected channel that will fault - /// for most things, but is useful for testing + /// Declare a new queue with the specified set of arguments. /// </summary> - /// <returns>A new disconnected channel</returns> - public static IChannel CreateDisconnectedChannel() + /// <param name="queueName">Name of the queue</param> + /// <param name="isDurable">True if the queue should be durable</param> + /// <param name="isExclusive">True if the queue should be exclusive to this channel</param> + /// <param name="isAutoDelete">True if the queue should be deleted when the channel closes</param> + public void DeclareQueue(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete) + { + DoQueueDeclare(queueName, isDurable, isExclusive, isAutoDelete); + } + + /// <summary> + /// Delete a queue with the specifies arguments. + /// </summary> + /// <param name="queueName">Name of the queue to delete</param> + /// <param name="ifUnused">If true, the queue will not deleted if it has no consumers</param> + /// <param name="ifEmpty">If true, the queue will not deleted if it has no messages</param> + /// <param name="noWait">If true, the server will not respond to the method</param> + public void DeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait) { - return new AmqChannel(); + DoDeleteQueue(queueName, ifUnused, ifEmpty, noWait); } + /// <summary> + /// Generate a new Unique name to use for a queue. + /// </summary> + /// <returns>A unique name to this channel</returns> + public string GenerateUniqueName() + { + string result = _connection.ProtocolSession.GenerateQueueName(); + return Regex.Replace(result, "[^a-z0-9_]", "_"); + } - public IBytesMessage CreateBytesMessage() + /// <summary> + /// Removes all messages from a queue. + /// </summary> + /// <param name="queueName">Name of the queue to delete</param> + /// <param name="noWait">If true, the server will not respond to the method</param> + public void PurgeQueue(string queueName, bool noWait) { - return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream"); + DoPurgeQueue(queueName, noWait); + } + + /// <summary> + /// Bind a queue to the specified exchange. + /// </summary> + /// <param name="queueName">Name of queue to bind</param> + /// <param name="exchangeName">Name of exchange to bind to</param> + /// <param name="routingKey">Routing key</param> + public void Bind(string queueName, string exchangeName, string routingKey) + { + DoBind(queueName, exchangeName, routingKey, new FieldTable()); } + /// <summary> + /// Bind a queue to the specified exchange. + /// </summary> + /// <param name="queueName">Name of queue to bind</param> + /// <param name="exchangeName">Name of exchange to bind to</param> + /// <param name="routingKey">Routing key</param> + /// <param name="args">Table of arguments for the binding. Used to bind with a Headers Exchange</param> + public void Bind(string queueName, string exchangeName, string routingKey, IFieldTable args) + { + DoBind(queueName, exchangeName, routingKey, (FieldTable)args); + } + + /// <summary> + /// Create a new empty message with no body. + /// </summary> + /// <returns>The new message</returns> public IMessage CreateMessage() { - // TODO: this is supposed to create a message consisting only of message headers return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream"); } - + + /// <summary> + /// Create a new message of the specified MIME type. + /// </summary> + /// <param name="mimeType">The mime type to create</param> + /// <returns>The new message</returns> public IMessage CreateMessage(string mimeType) { - return _messageFactoryRegistry.CreateMessage(mimeType); + return _messageFactoryRegistry.CreateMessage(mimeType); } + /// <summary> + /// Creates a new message for bytes (application/octet-stream). + /// </summary> + /// <returns>The new message</returns> + public IBytesMessage CreateBytesMessage() + { + return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream"); + } + + /// <summary> + /// Creates a new text message (text/plain) with empty content. + /// </summary> + /// <returns>The new message</returns> public ITextMessage CreateTextMessage() { return CreateTextMessage(String.Empty); } + /// <summary> + /// Creates a new text message (text/plain) with a body. + /// </summary> + /// <param name="text">Initial body of the message</param> + /// <returns>The new message</returns> public ITextMessage CreateTextMessage(string text) { ITextMessage msg = (ITextMessage)_messageFactoryRegistry.CreateMessage("text/plain"); @@ -248,24 +358,92 @@ namespace Apache.Qpid.Client return msg; } - public bool Transacted + /// <summary> + /// Creates a new Consumer using the builder pattern. + /// </summary> + /// <param name="queueName">Name of queue to receive messages from</param> + /// <returns>The builder object</returns> + public MessageConsumerBuilder CreateConsumerBuilder(string queueName) { - get - { - CheckNotClosed(); - return _transacted; - } + return new MessageConsumerBuilder(this, queueName); } - public AcknowledgeMode AcknowledgeMode + /// <summary> + /// Creates a new consumer. + /// </summary> + /// <param name="queueName">Name of queue to receive messages from</param> + /// <param name="prefetchLow">Low prefetch value</param> + /// <param name="prefetchHigh">High prefetch value</param> + /// <param name="noLocal">If true, messages sent on this channel will not be received by this consumer</param> + /// <param name="exclusive">If true, the consumer opens the queue in exclusive mode</param> + /// <returns>The new consumer</returns> + public IMessageConsumer CreateConsumer(string queueName, + int prefetchLow, + int prefetchHigh, + bool noLocal, + bool exclusive) { - get - { - CheckNotClosed(); - return _acknowledgeMode; - } + _logger.Debug(String.Format("CreateConsumer queueName={0} prefetchLow={1} prefetchHigh={2} noLocal={3} exclusive={4} ", + queueName, prefetchLow, prefetchHigh, noLocal, exclusive)); + + return CreateConsumerImpl(queueName, prefetchLow, prefetchHigh, noLocal, exclusive); + } + + /// <summary> + /// Unsubscribe from a queue. + /// </summary> + /// <param name="subscriptionName">Subscription name</param> + public void Unsubscribe(String name) + { + throw new NotImplementedException(); + } + + /// <summary> + /// Create a new message publisher using the builder pattern. + /// </summary> + /// <returns>The builder object</returns> + public MessagePublisherBuilder CreatePublisherBuilder() + { + return new MessagePublisherBuilder(this); + } + + /// <summary> + /// Create a new message publisher. + /// </summary> + /// <param name="exchangeName">Name of exchange to publish to</param> + /// <param name="routingKey">Routing key</param> + /// <param name="deliveryMode">Default delivery mode</param> + /// <param name="timeToLive">Default TTL time of messages</param> + /// <param name="immediate">If true, sent immediately</param> + /// <param name="mandatory">If true, the broker will return an error + /// (as a connection exception) if the message cannot be delivered</param> + /// <param name="priority">Default message priority</param> + /// <returns>The new message publisher</returns> + public IMessagePublisher CreatePublisher(string exchangeName, string routingKey, DeliveryMode deliveryMode, + long timeToLive, bool immediate, bool mandatory, int priority) + { + _logger.Debug(string.Format("Using new CreatePublisher exchangeName={0}, exchangeClass={1} routingKey={2}", + exchangeName, "none", routingKey)); + + return CreateProducerImpl(exchangeName, routingKey, deliveryMode, + timeToLive, immediate, mandatory, priority); + } + + /// <summary> + /// Recover after transaction failure. + /// </summary> + /// <remarks>The 0-8 protocol does not support this, not implemented exception will always be thrown.</remarks> + public void Recover() + { + CheckNotClosed(); + CheckNotTransacted(); + + throw new NotImplementedException(); } + /// <summary> + /// Commit the transaction. + /// </summary> public void Commit() { // FIXME: Fail over safety. Needs FailoverSupport? @@ -291,32 +469,50 @@ namespace Apache.Qpid.Client } } + /// <summary> + /// Rollback the transaction. + /// </summary> public void Rollback() { - lock ( _suspensionLock ) - { - CheckTransacted(); // throws IllegalOperationException if not a transacted session + lock (_suspensionLock) + { + CheckTransacted(); // throws IllegalOperationException if not a transacted session - try - { - bool suspended = IsSuspended; - if ( !suspended ) - Suspend(true); + try + { + bool suspended = IsSuspended; + if (!suspended) + { + Suspend(true); + } - // todo: rollback dispatcher when TX support is added - //if ( _dispatcher != null ) - // _dispatcher.Rollback(); + // todo: rollback dispatcher when TX support is added + //if ( _dispatcher != null ) + // _dispatcher.Rollback(); + + _connection.ConvenientProtocolWriter.SyncWrite( + TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody)); - _connection.ConvenientProtocolWriter.SyncWrite( - TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody)); + if ( !suspended ) + { + Suspend(false); + } + } + catch (AMQException e) + { + throw new QpidException("Failed to rollback", e); + } + } + } - if ( !suspended ) - Suspend(false); - } catch ( AMQException e ) - { - throw new QpidException("Failed to rollback", e); - } - } + /// <summary> + /// Create a disconnected channel that will fault + /// for most things, but is useful for testing + /// </summary> + /// <returns>A new disconnected channel</returns> + public static IChannel CreateDisconnectedChannel() + { + return new AmqChannel(); } public override void Close() @@ -349,6 +545,56 @@ namespace Apache.Qpid.Client } } + /** + * Called when the server initiates the closure of the session + * unilaterally. + * @param e the exception that caused this session to be closed. Null causes the + */ + public void ClosedWithException(Exception e) + { + lock (_connection.FailoverMutex) + { + // An AMQException has an error code and message already and will be passed in when closure occurs as a + // result of a channel close request + SetClosed(); + AMQException amqe; + + if (e is AMQException) + { + amqe = (AMQException) e; + } + else + { + amqe = new AMQException("Closing session forcibly", e); + } + + _connection.DeregisterSession(_channelId); + CloseProducersAndConsumers(amqe); + } + } + + public void MessageReceived(UnprocessedMessage message) + { + if (_logger.IsDebugEnabled) + { + _logger.Debug("Message received in session with channel id " + _channelId); + } + + if ( message.DeliverBody == null ) + { + ReturnBouncedMessage(message); + } + else + { + _queue.Enqueue(message); + } + } + + public void Dispose() + { + Close(); + } + private void SetClosed() { Interlocked.Exchange(ref _closed, CLOSED); @@ -378,32 +624,6 @@ namespace Apache.Qpid.Client } } - /** - * Called when the server initiates the closure of the session - * unilaterally. - * @param e the exception that caused this session to be closed. Null causes the - */ - public void ClosedWithException(Exception e) - { - lock (_connection.FailoverMutex) - { - // An AMQException has an error code and message already and will be passed in when closure occurs as a - // result of a channel close request - SetClosed(); - AMQException amqe; - if (e is AMQException) - { - amqe = (AMQException) e; - } - else - { - amqe = new AMQException("Closing session forcibly", e); - } - _connection.DeregisterSession(_channelId); - CloseProducersAndConsumers(amqe); - } - } - /// <summary> /// Called to close message producers cleanly. This may or may <b>not</b> be as a result of an error. There is /// currently no way of propagating errors to message producers (this is a JMS limitation). @@ -411,6 +631,7 @@ namespace Apache.Qpid.Client private void CloseProducers() { _logger.Debug("Closing producers on session " + this); + // we need to clone the list of producers since the close() method updates the _producers collection // which would result in a concurrent modification exception ArrayList clonedProducers = new ArrayList(_producers.Values); @@ -420,19 +641,20 @@ namespace Apache.Qpid.Client _logger.Debug("Closing producer " + prod); prod.Close(); } + // at this point the _producers map is empty } /// <summary> /// Called to close message consumers cleanly. This may or may <b>not</b> be as a result of an error. /// <param name="error">not null if this is a result of an error occurring at the connection level</param> - /// private void CloseConsumers(Exception error) { if (_dispatcher != null) { _dispatcher.StopDispatcher(); } + // we need to clone the list of consumers since the close() method updates the _consumers collection // which would result in a concurrent modification exception ArrayList clonedConsumers = new ArrayList(_consumers.Values); @@ -448,33 +670,11 @@ namespace Apache.Qpid.Client con.Close(); } } - // at this point the _consumers map will be empty - } - - public void Recover() - { - CheckNotClosed(); - CheckNotTransacted(); // throws IllegalOperationException if not a transacted session - // TODO: This cannot be implemented using 0.8 semantics - throw new NotImplementedException(); - } - - public void Run() - { - throw new NotImplementedException(); - } - - public IMessagePublisher CreatePublisher(string exchangeName, string routingKey, DeliveryMode deliveryMode, - long timeToLive, bool immediate, bool mandatory, int priority) - { - _logger.Debug(string.Format("Using new CreatePublisher exchangeName={0}, exchangeClass={1} routingKey={2}", - exchangeName, "none", routingKey)); - return CreateProducerImpl(exchangeName, routingKey, deliveryMode, - timeToLive, immediate, mandatory, priority); + // at this point the _consumers map will be empty } - public IMessagePublisher CreateProducerImpl(string exchangeName, string routingKey, + private IMessagePublisher CreateProducerImpl(string exchangeName, string routingKey, DeliveryMode deliveryMode, long timeToLive, bool immediate, bool mandatory, int priority) { @@ -496,32 +696,21 @@ namespace Apache.Qpid.Client } } - public IMessageConsumer CreateConsumer(string queueName, - int prefetchLow, - int prefetchHigh, - bool noLocal, - bool exclusive, - bool durable, - string subscriptionName) - { - _logger.Debug(String.Format("CreateConsumer queueName={0} prefetchLow={1} prefetchHigh={2} noLocal={3} exclusive={4} durable={5} subscriptionName={6}", - queueName, prefetchLow, prefetchHigh, noLocal, exclusive, durable, subscriptionName)); - return CreateConsumerImpl(queueName, prefetchLow, prefetchHigh, noLocal, exclusive, durable, subscriptionName); - } - + /// <summary> Creates a message consumer on this channel.</summary> + /// + /// <param name="queueName">The name of the queue to attach the consumer to.</param> + /// <param name="prefetchLow">The pre-fetch buffer low-water mark.</param> + /// <param name="prefetchHigh">The pre-fetch buffer high-water mark.</param> + /// <param name="noLocal">The no-local flag, <tt>true</tt> means that the consumer does not receive messages sent on this channel.</param> + /// <param name="exclusive">The exclusive flag, <tt>true</tt> gives this consumer exclusive receive access to the queue.</param> + /// + /// <return>The message consumer.</return> private IMessageConsumer CreateConsumerImpl(string queueName, int prefetchLow, int prefetchHigh, bool noLocal, - bool exclusive, - bool durable, - string subscriptionName) + bool exclusive) { - if (durable || subscriptionName != null) - { - throw new NotImplementedException(); // TODO: durable subscriptions. - } - lock (_closingLock) { CheckNotClosed(); @@ -542,11 +731,6 @@ namespace Apache.Qpid.Client } } - public void Unsubscribe(String name) - { - throw new NotImplementedException(); // FIXME - } - private void CheckTransacted() { if (!Transacted) @@ -562,55 +746,7 @@ namespace Apache.Qpid.Client throw new InvalidOperationException("Channel is transacted"); } } - - public void MessageReceived(UnprocessedMessage message) - { - if (_logger.IsDebugEnabled) - { - _logger.Debug("Message received in session with channel id " + _channelId); - } - if ( message.DeliverBody == null ) - { - ReturnBouncedMessage(message); - } else - { - _queue.Enqueue(message); - } - } - - public int DefaultPrefetch - { - get { return DefaultPrefetchHigh; } - } - public int DefaultPrefetchLow - { - get { return _defaultPrefetchLowMark; } - } - public int DefaultPrefetchHigh - { - get { return _defaultPrefetchHighMark; } - } - public bool IsSuspended - { - get { return _suspended; } - } - - public ushort ChannelId - { - get - { - return _channelId; - } - } - - public AMQConnection Connection - { - get - { - return _connection; - } - } - + internal void Start() { _dispatcher = new Dispatcher(this); @@ -658,11 +794,6 @@ namespace Apache.Qpid.Client return ++_nextProducerId; } - public void Dispose() - { - Close(); - } - /** * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after * failover when the client has veoted resubscription. @@ -714,11 +845,6 @@ namespace Apache.Qpid.Client // at this point the _consumers map will be empty } - public void PurgeQueue(string queueName, bool noWait) - { - DoPurgeQueue(queueName, noWait); - } - private void DoPurgeQueue(string queueName, bool noWait) { try @@ -757,29 +883,19 @@ namespace Apache.Qpid.Client /// Callers must hold the failover mutex before calling this method. /// </summary> /// <param name="consumer"></param> - void RegisterConsumer(BasicMessageConsumer consumer) + private void RegisterConsumer(BasicMessageConsumer consumer) { String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.NoLocal, - consumer.Exclusive, consumer.AcknowledgeMode); + consumer.Exclusive, consumer.AcknowledgeMode); consumer.ConsumerTag = consumerTag; _consumers.Add(consumerTag, consumer); } - public void Bind(string queueName, string exchangeName, string routingKey, IFieldTable args) - { - DoBind(queueName, exchangeName, routingKey, (FieldTable)args); - } - - public void Bind(string queueName, string exchangeName, string routingKey) - { - DoBind(queueName, exchangeName, routingKey, new FieldTable()); - } - internal void DoBind(string queueName, string exchangeName, string routingKey, FieldTable args) { _logger.Debug(string.Format("QueueBind queueName={0} exchangeName={1} routingKey={2}, arg={3}", - queueName, exchangeName, routingKey, args)); + queueName, exchangeName, routingKey, args)); AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0, queueName, exchangeName, @@ -798,9 +914,9 @@ namespace Apache.Qpid.Client String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++); AMQFrame basicConsume = BasicConsumeBody.CreateAMQFrame(_channelId, 0, - queueName, tag, noLocal, - acknowledgeMode == AcknowledgeMode.NoAcknowledge, - exclusive, true, new FieldTable()); + queueName, tag, noLocal, + acknowledgeMode == AcknowledgeMode.NoAcknowledge, + exclusive, true, new FieldTable()); _replayFrames.Add(basicConsume); @@ -808,34 +924,24 @@ namespace Apache.Qpid.Client return tag; } - public void DeleteExchange(string exchangeName) - { - throw new NotImplementedException(); // FIXME - } - - public void DeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait) - { - DoDeleteQueue(queueName, ifUnused, ifEmpty, noWait); - } - private void DoDeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait) { try { _logger.Debug(string.Format("DeleteQueue name={0}", queueName)); - AMQFrame queueDelete = QueueDeleteBody.CreateAMQFrame(_channelId, 0, - queueName, // queueName - ifUnused, // IfUnUsed - ifEmpty, // IfEmpty - noWait); // NoWait + AMQFrame queueDelete = QueueDeleteBody.CreateAMQFrame(_channelId, 0, queueName, ifUnused, ifEmpty, noWait); _replayFrames.Add(queueDelete); if (noWait) + { _connection.ProtocolWriter.Write(queueDelete); + } else + { _connection.ConvenientProtocolWriter.SyncWrite(queueDelete, typeof(QueueDeleteOkBody)); + } } catch (AMQException) { @@ -843,34 +949,12 @@ namespace Apache.Qpid.Client } } - public MessageConsumerBuilder CreateConsumerBuilder(string queueName) - { - return new MessageConsumerBuilder(this, queueName); - } - - public MessagePublisherBuilder CreatePublisherBuilder() - { - return new MessagePublisherBuilder(this); - } - - public string GenerateUniqueName() - { - string result = _connection.ProtocolSession.GenerateQueueName(); - return Regex.Replace(result, "[^a-z0-9_]", "_"); - } - - public void DeclareQueue(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete) - { - DoQueueDeclare(queueName, isDurable, isExclusive, isAutoDelete); - } - private void DoQueueDeclare(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete) { _logger.Debug(string.Format("DeclareQueue name={0} durable={1} exclusive={2}, auto-delete={3}", queueName, isDurable, isExclusive, isAutoDelete)); - AMQFrame queueDeclare = QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, - false, isDurable, isExclusive, + AMQFrame queueDeclare = QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive, isAutoDelete, true, null); _replayFrames.Add(queueDeclare); @@ -881,23 +965,16 @@ namespace Apache.Qpid.Client } } - public void DeclareExchange(String exchangeName, String exchangeClass) - { - _logger.Debug(string.Format("DeclareExchange vame={0} exchangeClass={1}", exchangeName, exchangeClass)); - - DeclareExchange(_channelId, 0, exchangeName, exchangeClass, false, false, false, false, true, null); - } - // AMQP-level method. private void DeclareExchange(ushort channelId, ushort ticket, string exchangeName, string exchangeClass, bool passive, bool durable, bool autoDelete, bool xinternal, bool noWait, FieldTable args) { _logger.Debug(String.Format("DeclareExchange channelId={0} exchangeName={1} exchangeClass={2}", - _channelId, exchangeName, exchangeClass)); + _channelId, exchangeName, exchangeClass)); - AMQFrame declareExchange = ExchangeDeclareBody.CreateAMQFrame( - channelId, ticket, exchangeName, exchangeClass, passive, durable, autoDelete, xinternal, noWait, args); + AMQFrame declareExchange = ExchangeDeclareBody.CreateAMQFrame(channelId, ticket, exchangeName, exchangeClass, passive, + durable, autoDelete, xinternal, noWait, args); _replayFrames.Add(declareExchange); @@ -911,7 +988,7 @@ namespace Apache.Qpid.Client else { throw new NotImplementedException("Don't use nowait=false with DeclareExchange"); -// _connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof (ExchangeDeclareOkBody)); + //_connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof (ExchangeDeclareOkBody)); } } @@ -935,75 +1012,167 @@ namespace Apache.Qpid.Client _connection.ProtocolWriter.Write(ackFrame); } - /// <summary> - /// Handle a message that bounced from the server, creating - /// the corresponding exception and notifying the connection about it - /// </summary> - /// <param name="message">Unprocessed message</param> - private void ReturnBouncedMessage(UnprocessedMessage message) - { - try - { - AbstractQmsMessage bouncedMessage = - _messageFactoryRegistry.CreateMessage( - 0, false, message.ContentHeader, - message.Bodies - ); - - int errorCode = message.BounceBody.ReplyCode; - string reason = message.BounceBody.ReplyText; - _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")"); - AMQException exception; - if ( errorCode == AMQConstant.NO_CONSUMERS.Code ) - { - exception = new AMQNoConsumersException(reason, bouncedMessage); - } else if ( errorCode == AMQConstant.NO_ROUTE.Code ) - { - exception = new AMQNoRouteException(reason, bouncedMessage); - } else - { - exception = new AMQUndeliveredException(errorCode, reason, bouncedMessage); - } - _connection.ExceptionReceived(exception); - } catch ( Exception ex ) - { - _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", ex); - } - - } - - private void OnPrefetchLowMark(int count) - { - if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge ) - { - _logger.Warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + count); - Suspend(false); - } - } - private void OnPrefetchHighMark(int count) - { - if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge ) - { - _logger.Warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + count); - Suspend(true); - } - } - - private void Suspend(bool suspend) - { - lock ( _suspensionLock ) - { - if ( _logger.IsDebugEnabled ) - { - _logger.Debug("Setting channel flow : " + (suspend ? "suspended" : "unsuspended")); - } - - _suspended = suspend; - AMQFrame frame = ChannelFlowBody.CreateAMQFrame(_channelId, !suspend); - - Connection.ConvenientProtocolWriter.SyncWrite(frame, typeof(ChannelFlowOkBody)); - } - } + /// <summary> + /// Handle a message that bounced from the server, creating + /// the corresponding exception and notifying the connection about it + /// </summary> + /// <param name="message">Unprocessed message</param> + private void ReturnBouncedMessage(UnprocessedMessage message) + { + try + { + AbstractQmsMessage bouncedMessage = + _messageFactoryRegistry.CreateMessage(0, false, message.ContentHeader, message.Bodies); + + int errorCode = message.BounceBody.ReplyCode; + string reason = message.BounceBody.ReplyText; + _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")"); + AMQException exception; + + if (errorCode == AMQConstant.NO_CONSUMERS.Code) + { + exception = new AMQNoConsumersException(reason, bouncedMessage); + } + else if (errorCode == AMQConstant.NO_ROUTE.Code) + { + exception = new AMQNoRouteException(reason, bouncedMessage); + } + else + { + exception = new AMQUndeliveredException(errorCode, reason, bouncedMessage); + } + + _connection.ExceptionReceived(exception); + } + catch (Exception ex) + { + _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", ex); + } + } + + private void OnPrefetchLowMark(int count) + { + if (_acknowledgeMode == AcknowledgeMode.NoAcknowledge) + { + _logger.Warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + count); + Suspend(false); + } + } + + private void OnPrefetchHighMark(int count) + { + if (_acknowledgeMode == AcknowledgeMode.NoAcknowledge) + { + _logger.Warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + count); + Suspend(true); + } + } + + private void Suspend(bool suspend) + { + lock (_suspensionLock) + { + if (_logger.IsDebugEnabled) + { + _logger.Debug("Setting channel flow : " + (suspend ? "suspended" : "unsuspended")); + } + + _suspended = suspend; + AMQFrame frame = ChannelFlowBody.CreateAMQFrame(_channelId, !suspend); + + Connection.ConvenientProtocolWriter.SyncWrite(frame, typeof(ChannelFlowOkBody)); + } + } + + /// <summary>A Dispatcher turns the consumption of incoming messages from an arrival queue, into event notifications on consumers. + /// The arrival queue is typically a blocking queue, on which a dispatcher waits for messages to consume. Upon receipt of a message + /// the dispatcher finds the consumer that is listening to the queue to which the message has been send and notifies it of the new + /// message. + /// + /// <p/>The Dispatcher also contains logic to recognize bounced messages. Bounced messages returned from the broker can be + /// told apart from regular deliveries because they do not have a delivery queue set on them. When the dispatcher receives a + /// bounced message it creates an exception and notifies the connection, to which its containing channel belongs, of the condition. + /// + /// <p/><table id="crc"><caption>CRC Card</caption> + /// <tr><th> Responsibilities <th> Collaborations + /// <tr><td> Notify consumers of message arrivals on their queues. <td> <see cref="BasicMessageConsumer"/> + /// <tr><td> Notify the containing connection of bounced message arrivals. <td> <see cref="AMQConnection"/> + /// </table> + /// </summary> + /// + /// <remarks>Stop mechanism seems wrong, as queue consume is evaluated after stop flag, so could consume and notify one more message. + /// Placing stop check after consume may also be wrong as it may cause a message to be thrown away. Seems more correct to use interupt on + /// the block thread to cause it to prematurely return from its wait, whereupon it can be made to re-check the stop flag.</remarks> + /// + /// <remarks>Exception swalled, if there is an exception whilst notifying the connection on bounced messages. Unhandled excetpion should + /// fall through and termiante the loop, as it is a bug if it occurrs.</remarks> + private class Dispatcher + { + /// <summary> Flag used to indicate when this dispatcher is to be stopped (0=go, 1=stop). </summary> + private int _stopped = 0; + + /// <summary> The channel for which this is a dispatcher. </summary> + private AmqChannel _containingChannel; + + /// <summary> Creates a dispatcher on the specified channel. </summary> + /// + /// <param name="containingChannel"> The channel on which this is a dispatcher. </param> + public Dispatcher(AmqChannel containingChannel) + { + _containingChannel = containingChannel; + } + + /// <summary>The message dispatch loop. Consumes messages from the channels queue, notifying consumers of regular deliveries, and + /// the connection of bounced messages.</summary> + public void RunDispatcher() + { + UnprocessedMessage message; + while (_stopped == 0 && (message = (UnprocessedMessage)_containingChannel._queue.Dequeue()) != null) + { + if (message.DeliverBody != null) + { + BasicMessageConsumer consumer = (BasicMessageConsumer) _containingChannel._consumers[message.DeliverBody.ConsumerTag]; + + if (consumer == null) + { + _logger.Warn("Received a message from queue " + message.DeliverBody.ConsumerTag + " without a f - ignoring..."); + } + else + { + consumer.NotifyMessage(message, _containingChannel.ChannelId); + } + } + else + { + try + { + // Bounced message is processed here, away from the transport thread + AbstractQmsMessage bouncedMessage = _containingChannel._messageFactoryRegistry. + CreateMessage(0, false, message.ContentHeader, message.Bodies); + + int errorCode = message.BounceBody.ReplyCode; + string reason = message.BounceBody.ReplyText; + + _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")"); + + _containingChannel._connection.ExceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage)); + } + catch (Exception e) + { + _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e); + } + } + } + + _logger.Debug("Dispatcher thread terminating for channel: " + _containingChannel._channelId + "."); + } + + /// <summary> Sets a stop flag on this dispatcher, which causes its dispatch loop to exit at the next available opportunity. </summary> + public void StopDispatcher() + { + Interlocked.Exchange(ref _stopped, 1); + } + } } } diff --git a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs index fd4ff79505..6a5ba82264 100644 --- a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs +++ b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs @@ -34,9 +34,7 @@ namespace Apache.Qpid.Client private bool _noLocal; - /** - * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover - */ + /** Holds the exclusive status flag for the consumers access to its queue. */ private bool _exclusive; public bool Exclusive @@ -448,6 +446,5 @@ namespace Apache.Qpid.Client break; } } - } } diff --git a/dotnet/Qpid.Client/Client/Closeable.cs b/dotnet/Qpid.Client/Client/Closeable.cs index 382e0aefde..b9664ccea3 100644 --- a/dotnet/Qpid.Client/Client/Closeable.cs +++ b/dotnet/Qpid.Client/Client/Closeable.cs @@ -23,26 +23,42 @@ using Apache.Qpid.Messaging; namespace Apache.Qpid.Client { - public abstract class Closeable : ICloseable + /// <summary>Closeable provides monitoring of the state of a closeable resource; whether it is open or closed. It also provides a lock on which + /// attempts to close the resource from multiple threads can be coordinated. + /// + /// <p/><table id="crc"><caption>CRC Card</caption> + /// <tr><th> Responsibilities <th> Collaborations + /// <tr><td> Close (and clean-up) a resource. + /// <tr><td> Monitor the state of a closeable resource. + /// <tr><td> Synchronous attempts to close resource from concurrent threads. + /// </table> + /// </summary> + /// + /// <remarks>Poor encapsulation of the close lock. Better to completely hide the implementation, such that there is a method, e.g., DoSingleClose, + /// that sub-classes implement. Guaranteed to only be called by one thread at once, and iff the object is not already closed. That is, multiple + /// simultaneous closes will result in a single call to the real close method. Put the wait and condition checking loop in this base class. + /// </remarks> + public abstract class Closeable : ICloseable { - /// <summary> - /// Used to ensure orderly closing of the object. The only method that is allowed to be called - /// from another thread of control is close(). - /// </summary> + /// <summary> Constant representing the closed state. </summary> + protected const int CLOSED = 1; + + /// <summary> Constant representing the open state. </summary> + protected const int NOT_CLOSED = 2; + + /// <summary> Used to ensure orderly closing of the object. </summary> protected readonly object _closingLock = new object(); - /// <summary> - /// All access to this field should be using the Inerlocked class, to make it atomic. - /// Hence it is an int since you cannot use a bool with the Interlocked class. - /// </summary> + /// <summary> Indicates the state of this resource; open or closed. </summary> protected int _closed = NOT_CLOSED; - protected const int CLOSED = 1; - protected const int NOT_CLOSED = 2; - /// <summary> /// Checks the not closed. /// </summary> + /// + /// <remarks>Don't like check methods that throw exceptions. a) it can come as a surprise without checked exceptions, b) it limits the + /// callers choice, if the caller would prefer a boolean, c) it is not side-effect free programming, where such could be used. Get rid + /// of this and replace with boolean.</remarks> protected void CheckNotClosed() { if (_closed == CLOSED) @@ -51,9 +67,7 @@ namespace Apache.Qpid.Client } } - /// <summary> - /// Gets a value indicating whether this <see cref="Closeable"/> is closed. - /// </summary> + /// <summary>Indicates whether this resource is closed.</summary> /// <value><c>true</c> if closed; otherwise, <c>false</c>.</value> public bool Closed { @@ -63,10 +77,7 @@ namespace Apache.Qpid.Client } } - /// <summary> - /// Close the resource - /// </summary> - /// <exception cref="QpidMessagingException">If something goes wrong</exception> + /// <summary> Close this resource. </summary> public abstract void Close(); } } |
