From c076635c6b7e6494026ca398d124f95e66c13e06 Mon Sep 17 00:00:00 2001 From: Rupert Smith Date: Tue, 22 Jan 2008 11:54:54 +0000 Subject: Qpid-730. Removed durable and subscription name from consumer builder, as they are not to do with consume but queue declaration. Durable subscribers must provide their own fixed queue names. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@614184 13f79535-47bb-0310-9956-ffa450edef68 --- dotnet/Qpid.Client/Client/AmqChannel.cs | 1005 ++++++++++++--------- dotnet/Qpid.Client/Client/BasicMessageConsumer.cs | 5 +- dotnet/Qpid.Client/Client/Closeable.cs | 49 +- 3 files changed, 618 insertions(+), 441 deletions(-) (limited to 'dotnet/Qpid.Client/Client') diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs index 84f08729dd..b5f6b6eec9 100644 --- a/dotnet/Qpid.Client/Client/AmqChannel.cs +++ b/dotnet/Qpid.Client/Client/AmqChannel.cs @@ -33,26 +33,43 @@ using Apache.Qpid.Protocol; namespace Apache.Qpid.Client { + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Declare queues. + ///
Declare exchanges. + ///
Bind queues to exchanges. + ///
Create messages. + ///
Set up message consumers on the channel. + ///
Set up message producers on the channel. + ///
Commit the current transaction. + ///
Roll-back the current transaction. + ///
Close the channel. + ///
+ ///

public class AmqChannel : Closeable, IChannel { private static readonly ILog _logger = LogManager.GetLogger(typeof(AmqChannel)); internal const int BASIC_CONTENT_TYPE = 60; + public const int DEFAULT_PREFETCH_HIGH_MARK = 5000; + + public const int DEFAULT_PREFETCH_LOW_MARK = 2500; + private static int _nextSessionNumber = 0; + private AMQConnection _connection; + private int _sessionNumber; + private bool _suspended; + private object _suspensionLock = new object(); // Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature. private int _nextConsumerNumber = 1; - public const int DEFAULT_PREFETCH_HIGH_MARK = 5000; - public const int DEFAULT_PREFETCH_LOW_MARK = 2500; - - private AMQConnection _connection; - private bool _transacted; private AcknowledgeMode _acknowledgeMode; @@ -60,6 +77,7 @@ namespace Apache.Qpid.Client private ushort _channelId; private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK; + private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK; private FlowControlQueue _queue; @@ -68,14 +86,10 @@ namespace Apache.Qpid.Client private MessageFactoryRegistry _messageFactoryRegistry; - /// - /// Set of all producers created by this session - /// + /// Holds all of the producers created by this channel. private Hashtable _producers = Hashtable.Synchronized(new Hashtable()); - /// - /// Maps from consumer tag to JMSMessageConsumer instance - /// + /// Holds all of the consumers created by this channel. private Hashtable _consumers = Hashtable.Synchronized(new Hashtable()); private ArrayList _replayFrames = new ArrayList(); @@ -90,157 +104,253 @@ namespace Apache.Qpid.Client private long _nextProducerId; /// - /// Responsible for decoding a message fragment and passing it to the appropriate message consumer. + /// Initializes a new instance of the class. /// - private class Dispatcher + /// The connection. + /// The channel id. + /// if set to true [transacted]. + /// The acknowledge mode. + /// Default prefetch high value + /// Default prefetch low value + internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode, + int defaultPrefetchHigh, int defaultPrefetchLow) + : this() { - private int _stopped = 0; - - private AmqChannel _containingChannel; + _sessionNumber = Interlocked.Increment(ref _nextSessionNumber); + _connection = con; + _transacted = transacted; - public Dispatcher(AmqChannel containingChannel) + if ( transacted ) { - _containingChannel = containingChannel; + _acknowledgeMode = AcknowledgeMode.SessionTransacted; + } + else + { + _acknowledgeMode = acknowledgeMode; } - - /// - /// Runs the dispatcher. This is intended to be Run in a separate thread. - /// - public void RunDispatcher() + + _channelId = channelId; + _defaultPrefetchHighMark = defaultPrefetchHigh; + _defaultPrefetchLowMark = defaultPrefetchLow; + + if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge ) { - UnprocessedMessage message; + _queue = new FlowControlQueue(_defaultPrefetchLowMark, _defaultPrefetchHighMark, + new ThresholdMethod(OnPrefetchLowMark), + new ThresholdMethod(OnPrefetchHighMark)); + } + else + { + // low and upper are the same + _queue = new FlowControlQueue(_defaultPrefetchHighMark, _defaultPrefetchHighMark, + null, null); + } + } - while (_stopped == 0 && (message = (UnprocessedMessage)_containingChannel._queue.Dequeue()) != null) - { - //_queue.size() - DispatchMessage(message); - } + private AmqChannel() + { + _messageFactoryRegistry = MessageFactoryRegistry.NewDefaultRegistry(); + } - _logger.Debug("Dispatcher thread terminating for channel " + _containingChannel._channelId); + /// + /// Acknowledge mode for messages received. + /// + public AcknowledgeMode AcknowledgeMode + { + get + { + CheckNotClosed(); + return _acknowledgeMode; } + } - private void DispatchMessage(UnprocessedMessage message) + /// + /// True if the channel should use transactions. + /// + public bool Transacted + { + get { - if (message.DeliverBody != null) - { - BasicMessageConsumer consumer = (BasicMessageConsumer) _containingChannel._consumers[message.DeliverBody.ConsumerTag]; + CheckNotClosed(); + return _transacted; + } + } - if (consumer == null) - { - _logger.Warn("Received a message from queue " + message.DeliverBody.ConsumerTag + " without a f - ignoring..."); - } - else - { - consumer.NotifyMessage(message, _containingChannel.ChannelId); - } - } - else - { - try - { - // Bounced message is processed here, away from the mina thread - AbstractQmsMessage bouncedMessage = _containingChannel._messageFactoryRegistry. - CreateMessage(0, false, message.ContentHeader, message.Bodies); + /// + /// Prefetch value to be used as the default for + /// consumers created on this channel. + /// + public int DefaultPrefetch + { + get { return DefaultPrefetchHigh; } + } - int errorCode = message.BounceBody.ReplyCode; - string reason = message.BounceBody.ReplyText; - _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")"); + /// + /// Prefetch low value to be used as the default for + /// consumers created on this channel. + /// + public int DefaultPrefetchLow + { + get { return _defaultPrefetchLowMark; } + } - _containingChannel._connection.ExceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage)); - } - catch (Exception e) - { - _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e); - } - } - } + /// + /// Prefetch high value to be used as the default for + /// consumers created on this channel. + /// + public int DefaultPrefetchHigh + { + get { return _defaultPrefetchHighMark; } + } - public void StopDispatcher() - { - Interlocked.Exchange(ref _stopped, 1); - } + /// Indicates whether or not this channel is currently suspended. + public bool IsSuspended + { + get { return _suspended; } + } + + /// Provides the channels number within the the connection. + public ushort ChannelId + { + get { return _channelId; } + } + + /// Provides the connection that this channel runs over. + public AMQConnection Connection + { + get { return _connection; } } /// - /// Initializes a new instance of the class. + /// Declare a new exchange. /// - /// The connection. - /// The channel id. - /// if set to true [transacted]. - /// The acknowledge mode. - /// Default prefetch high value - /// Default prefetch low value - internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow) - : this() - { - _sessionNumber = Interlocked.Increment(ref _nextSessionNumber); - _connection = con; - _transacted = transacted; - if ( transacted ) - { - _acknowledgeMode = AcknowledgeMode.SessionTransacted; - } else - { - _acknowledgeMode = acknowledgeMode; - } - _channelId = channelId; - _defaultPrefetchHighMark = defaultPrefetchHigh; - _defaultPrefetchLowMark = defaultPrefetchLow; - - if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge ) - { - _queue = new FlowControlQueue( - _defaultPrefetchLowMark, _defaultPrefetchHighMark, - new ThresholdMethod(OnPrefetchLowMark), - new ThresholdMethod(OnPrefetchHighMark) - ); - } else - { - // low and upper are the same - _queue = new FlowControlQueue( - _defaultPrefetchHighMark, _defaultPrefetchHighMark, - null, null - ); - } + /// Name of the exchange + /// Class of the exchange, from + public void DeclareExchange(String exchangeName, String exchangeClass) + { + _logger.Debug(string.Format("DeclareExchange vame={0} exchangeClass={1}", exchangeName, exchangeClass)); + + DeclareExchange(_channelId, 0, exchangeName, exchangeClass, false, false, false, false, true, null); } - private AmqChannel() + /// + /// Declare a new exchange using the default exchange class. + /// + /// Name of the exchange + public void DeleteExchange(string exchangeName) { - _messageFactoryRegistry = MessageFactoryRegistry.NewDefaultRegistry(); + throw new NotImplementedException(); } /// - /// Create a disconnected channel that will fault - /// for most things, but is useful for testing + /// Declare a new queue with the specified set of arguments. /// - /// A new disconnected channel - public static IChannel CreateDisconnectedChannel() + /// Name of the queue + /// True if the queue should be durable + /// True if the queue should be exclusive to this channel + /// True if the queue should be deleted when the channel closes + public void DeclareQueue(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete) { - return new AmqChannel(); + DoQueueDeclare(queueName, isDurable, isExclusive, isAutoDelete); } + /// + /// Delete a queue with the specifies arguments. + /// + /// Name of the queue to delete + /// If true, the queue will not deleted if it has no consumers + /// If true, the queue will not deleted if it has no messages + /// If true, the server will not respond to the method + public void DeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait) + { + DoDeleteQueue(queueName, ifUnused, ifEmpty, noWait); + } - public IBytesMessage CreateBytesMessage() + /// + /// Generate a new Unique name to use for a queue. + /// + /// A unique name to this channel + public string GenerateUniqueName() { - return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream"); + string result = _connection.ProtocolSession.GenerateQueueName(); + return Regex.Replace(result, "[^a-z0-9_]", "_"); } + /// + /// Removes all messages from a queue. + /// + /// Name of the queue to delete + /// If true, the server will not respond to the method + public void PurgeQueue(string queueName, bool noWait) + { + DoPurgeQueue(queueName, noWait); + } + + /// + /// Bind a queue to the specified exchange. + /// + /// Name of queue to bind + /// Name of exchange to bind to + /// Routing key + public void Bind(string queueName, string exchangeName, string routingKey) + { + DoBind(queueName, exchangeName, routingKey, new FieldTable()); + } + + /// + /// Bind a queue to the specified exchange. + /// + /// Name of queue to bind + /// Name of exchange to bind to + /// Routing key + /// Table of arguments for the binding. Used to bind with a Headers Exchange + public void Bind(string queueName, string exchangeName, string routingKey, IFieldTable args) + { + DoBind(queueName, exchangeName, routingKey, (FieldTable)args); + } + + /// + /// Create a new empty message with no body. + /// + /// The new message public IMessage CreateMessage() { - // TODO: this is supposed to create a message consisting only of message headers return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream"); } - + + /// + /// Create a new message of the specified MIME type. + /// + /// The mime type to create + /// The new message public IMessage CreateMessage(string mimeType) { - return _messageFactoryRegistry.CreateMessage(mimeType); + return _messageFactoryRegistry.CreateMessage(mimeType); } + /// + /// Creates a new message for bytes (application/octet-stream). + /// + /// The new message + public IBytesMessage CreateBytesMessage() + { + return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream"); + } + + /// + /// Creates a new text message (text/plain) with empty content. + /// + /// The new message public ITextMessage CreateTextMessage() { return CreateTextMessage(String.Empty); } + /// + /// Creates a new text message (text/plain) with a body. + /// + /// Initial body of the message + /// The new message public ITextMessage CreateTextMessage(string text) { ITextMessage msg = (ITextMessage)_messageFactoryRegistry.CreateMessage("text/plain"); @@ -248,24 +358,92 @@ namespace Apache.Qpid.Client return msg; } - public bool Transacted + /// + /// Creates a new Consumer using the builder pattern. + /// + /// Name of queue to receive messages from + /// The builder object + public MessageConsumerBuilder CreateConsumerBuilder(string queueName) { - get - { - CheckNotClosed(); - return _transacted; - } + return new MessageConsumerBuilder(this, queueName); } - public AcknowledgeMode AcknowledgeMode + /// + /// Creates a new consumer. + /// + /// Name of queue to receive messages from + /// Low prefetch value + /// High prefetch value + /// If true, messages sent on this channel will not be received by this consumer + /// If true, the consumer opens the queue in exclusive mode + /// The new consumer + public IMessageConsumer CreateConsumer(string queueName, + int prefetchLow, + int prefetchHigh, + bool noLocal, + bool exclusive) { - get - { - CheckNotClosed(); - return _acknowledgeMode; - } + _logger.Debug(String.Format("CreateConsumer queueName={0} prefetchLow={1} prefetchHigh={2} noLocal={3} exclusive={4} ", + queueName, prefetchLow, prefetchHigh, noLocal, exclusive)); + + return CreateConsumerImpl(queueName, prefetchLow, prefetchHigh, noLocal, exclusive); + } + + /// + /// Unsubscribe from a queue. + /// + /// Subscription name + public void Unsubscribe(String name) + { + throw new NotImplementedException(); + } + + /// + /// Create a new message publisher using the builder pattern. + /// + /// The builder object + public MessagePublisherBuilder CreatePublisherBuilder() + { + return new MessagePublisherBuilder(this); } + + /// + /// Create a new message publisher. + /// + /// Name of exchange to publish to + /// Routing key + /// Default delivery mode + /// Default TTL time of messages + /// If true, sent immediately + /// If true, the broker will return an error + /// (as a connection exception) if the message cannot be delivered + /// Default message priority + /// The new message publisher + public IMessagePublisher CreatePublisher(string exchangeName, string routingKey, DeliveryMode deliveryMode, + long timeToLive, bool immediate, bool mandatory, int priority) + { + _logger.Debug(string.Format("Using new CreatePublisher exchangeName={0}, exchangeClass={1} routingKey={2}", + exchangeName, "none", routingKey)); + + return CreateProducerImpl(exchangeName, routingKey, deliveryMode, + timeToLive, immediate, mandatory, priority); + } + + /// + /// Recover after transaction failure. + /// + /// The 0-8 protocol does not support this, not implemented exception will always be thrown. + public void Recover() + { + CheckNotClosed(); + CheckNotTransacted(); + throw new NotImplementedException(); + } + + /// + /// Commit the transaction. + /// public void Commit() { // FIXME: Fail over safety. Needs FailoverSupport? @@ -291,32 +469,50 @@ namespace Apache.Qpid.Client } } + /// + /// Rollback the transaction. + /// public void Rollback() { - lock ( _suspensionLock ) - { - CheckTransacted(); // throws IllegalOperationException if not a transacted session + lock (_suspensionLock) + { + CheckTransacted(); // throws IllegalOperationException if not a transacted session - try - { - bool suspended = IsSuspended; - if ( !suspended ) - Suspend(true); + try + { + bool suspended = IsSuspended; + if (!suspended) + { + Suspend(true); + } - // todo: rollback dispatcher when TX support is added - //if ( _dispatcher != null ) - // _dispatcher.Rollback(); + // todo: rollback dispatcher when TX support is added + //if ( _dispatcher != null ) + // _dispatcher.Rollback(); - _connection.ConvenientProtocolWriter.SyncWrite( - TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody)); + _connection.ConvenientProtocolWriter.SyncWrite( + TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody)); - if ( !suspended ) - Suspend(false); - } catch ( AMQException e ) - { - throw new QpidException("Failed to rollback", e); - } - } + if ( !suspended ) + { + Suspend(false); + } + } + catch (AMQException e) + { + throw new QpidException("Failed to rollback", e); + } + } + } + + /// + /// Create a disconnected channel that will fault + /// for most things, but is useful for testing + /// + /// A new disconnected channel + public static IChannel CreateDisconnectedChannel() + { + return new AmqChannel(); } public override void Close() @@ -349,6 +545,56 @@ namespace Apache.Qpid.Client } } + /** + * Called when the server initiates the closure of the session + * unilaterally. + * @param e the exception that caused this session to be closed. Null causes the + */ + public void ClosedWithException(Exception e) + { + lock (_connection.FailoverMutex) + { + // An AMQException has an error code and message already and will be passed in when closure occurs as a + // result of a channel close request + SetClosed(); + AMQException amqe; + + if (e is AMQException) + { + amqe = (AMQException) e; + } + else + { + amqe = new AMQException("Closing session forcibly", e); + } + + _connection.DeregisterSession(_channelId); + CloseProducersAndConsumers(amqe); + } + } + + public void MessageReceived(UnprocessedMessage message) + { + if (_logger.IsDebugEnabled) + { + _logger.Debug("Message received in session with channel id " + _channelId); + } + + if ( message.DeliverBody == null ) + { + ReturnBouncedMessage(message); + } + else + { + _queue.Enqueue(message); + } + } + + public void Dispose() + { + Close(); + } + private void SetClosed() { Interlocked.Exchange(ref _closed, CLOSED); @@ -378,32 +624,6 @@ namespace Apache.Qpid.Client } } - /** - * Called when the server initiates the closure of the session - * unilaterally. - * @param e the exception that caused this session to be closed. Null causes the - */ - public void ClosedWithException(Exception e) - { - lock (_connection.FailoverMutex) - { - // An AMQException has an error code and message already and will be passed in when closure occurs as a - // result of a channel close request - SetClosed(); - AMQException amqe; - if (e is AMQException) - { - amqe = (AMQException) e; - } - else - { - amqe = new AMQException("Closing session forcibly", e); - } - _connection.DeregisterSession(_channelId); - CloseProducersAndConsumers(amqe); - } - } - /// /// Called to close message producers cleanly. This may or may not be as a result of an error. There is /// currently no way of propagating errors to message producers (this is a JMS limitation). @@ -411,6 +631,7 @@ namespace Apache.Qpid.Client private void CloseProducers() { _logger.Debug("Closing producers on session " + this); + // we need to clone the list of producers since the close() method updates the _producers collection // which would result in a concurrent modification exception ArrayList clonedProducers = new ArrayList(_producers.Values); @@ -420,19 +641,20 @@ namespace Apache.Qpid.Client _logger.Debug("Closing producer " + prod); prod.Close(); } + // at this point the _producers map is empty } /// /// Called to close message consumers cleanly. This may or may not be as a result of an error. /// not null if this is a result of an error occurring at the connection level - /// private void CloseConsumers(Exception error) { if (_dispatcher != null) { _dispatcher.StopDispatcher(); } + // we need to clone the list of consumers since the close() method updates the _consumers collection // which would result in a concurrent modification exception ArrayList clonedConsumers = new ArrayList(_consumers.Values); @@ -448,33 +670,11 @@ namespace Apache.Qpid.Client con.Close(); } } - // at this point the _consumers map will be empty - } - - public void Recover() - { - CheckNotClosed(); - CheckNotTransacted(); // throws IllegalOperationException if not a transacted session - // TODO: This cannot be implemented using 0.8 semantics - throw new NotImplementedException(); - } - - public void Run() - { - throw new NotImplementedException(); - } - - public IMessagePublisher CreatePublisher(string exchangeName, string routingKey, DeliveryMode deliveryMode, - long timeToLive, bool immediate, bool mandatory, int priority) - { - _logger.Debug(string.Format("Using new CreatePublisher exchangeName={0}, exchangeClass={1} routingKey={2}", - exchangeName, "none", routingKey)); - return CreateProducerImpl(exchangeName, routingKey, deliveryMode, - timeToLive, immediate, mandatory, priority); + // at this point the _consumers map will be empty } - public IMessagePublisher CreateProducerImpl(string exchangeName, string routingKey, + private IMessagePublisher CreateProducerImpl(string exchangeName, string routingKey, DeliveryMode deliveryMode, long timeToLive, bool immediate, bool mandatory, int priority) { @@ -496,32 +696,21 @@ namespace Apache.Qpid.Client } } - public IMessageConsumer CreateConsumer(string queueName, - int prefetchLow, - int prefetchHigh, - bool noLocal, - bool exclusive, - bool durable, - string subscriptionName) - { - _logger.Debug(String.Format("CreateConsumer queueName={0} prefetchLow={1} prefetchHigh={2} noLocal={3} exclusive={4} durable={5} subscriptionName={6}", - queueName, prefetchLow, prefetchHigh, noLocal, exclusive, durable, subscriptionName)); - return CreateConsumerImpl(queueName, prefetchLow, prefetchHigh, noLocal, exclusive, durable, subscriptionName); - } - + /// Creates a message consumer on this channel. + /// + /// The name of the queue to attach the consumer to. + /// The pre-fetch buffer low-water mark. + /// The pre-fetch buffer high-water mark. + /// The no-local flag, true means that the consumer does not receive messages sent on this channel. + /// The exclusive flag, true gives this consumer exclusive receive access to the queue. + /// + /// The message consumer. private IMessageConsumer CreateConsumerImpl(string queueName, int prefetchLow, int prefetchHigh, bool noLocal, - bool exclusive, - bool durable, - string subscriptionName) + bool exclusive) { - if (durable || subscriptionName != null) - { - throw new NotImplementedException(); // TODO: durable subscriptions. - } - lock (_closingLock) { CheckNotClosed(); @@ -542,11 +731,6 @@ namespace Apache.Qpid.Client } } - public void Unsubscribe(String name) - { - throw new NotImplementedException(); // FIXME - } - private void CheckTransacted() { if (!Transacted) @@ -562,55 +746,7 @@ namespace Apache.Qpid.Client throw new InvalidOperationException("Channel is transacted"); } } - - public void MessageReceived(UnprocessedMessage message) - { - if (_logger.IsDebugEnabled) - { - _logger.Debug("Message received in session with channel id " + _channelId); - } - if ( message.DeliverBody == null ) - { - ReturnBouncedMessage(message); - } else - { - _queue.Enqueue(message); - } - } - - public int DefaultPrefetch - { - get { return DefaultPrefetchHigh; } - } - public int DefaultPrefetchLow - { - get { return _defaultPrefetchLowMark; } - } - public int DefaultPrefetchHigh - { - get { return _defaultPrefetchHighMark; } - } - public bool IsSuspended - { - get { return _suspended; } - } - - public ushort ChannelId - { - get - { - return _channelId; - } - } - - public AMQConnection Connection - { - get - { - return _connection; - } - } - + internal void Start() { _dispatcher = new Dispatcher(this); @@ -658,11 +794,6 @@ namespace Apache.Qpid.Client return ++_nextProducerId; } - public void Dispose() - { - Close(); - } - /** * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after * failover when the client has veoted resubscription. @@ -714,11 +845,6 @@ namespace Apache.Qpid.Client // at this point the _consumers map will be empty } - public void PurgeQueue(string queueName, bool noWait) - { - DoPurgeQueue(queueName, noWait); - } - private void DoPurgeQueue(string queueName, bool noWait) { try @@ -757,29 +883,19 @@ namespace Apache.Qpid.Client /// Callers must hold the failover mutex before calling this method. /// /// - void RegisterConsumer(BasicMessageConsumer consumer) + private void RegisterConsumer(BasicMessageConsumer consumer) { String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.NoLocal, - consumer.Exclusive, consumer.AcknowledgeMode); + consumer.Exclusive, consumer.AcknowledgeMode); consumer.ConsumerTag = consumerTag; _consumers.Add(consumerTag, consumer); } - public void Bind(string queueName, string exchangeName, string routingKey, IFieldTable args) - { - DoBind(queueName, exchangeName, routingKey, (FieldTable)args); - } - - public void Bind(string queueName, string exchangeName, string routingKey) - { - DoBind(queueName, exchangeName, routingKey, new FieldTable()); - } - internal void DoBind(string queueName, string exchangeName, string routingKey, FieldTable args) { _logger.Debug(string.Format("QueueBind queueName={0} exchangeName={1} routingKey={2}, arg={3}", - queueName, exchangeName, routingKey, args)); + queueName, exchangeName, routingKey, args)); AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0, queueName, exchangeName, @@ -798,9 +914,9 @@ namespace Apache.Qpid.Client String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++); AMQFrame basicConsume = BasicConsumeBody.CreateAMQFrame(_channelId, 0, - queueName, tag, noLocal, - acknowledgeMode == AcknowledgeMode.NoAcknowledge, - exclusive, true, new FieldTable()); + queueName, tag, noLocal, + acknowledgeMode == AcknowledgeMode.NoAcknowledge, + exclusive, true, new FieldTable()); _replayFrames.Add(basicConsume); @@ -808,34 +924,24 @@ namespace Apache.Qpid.Client return tag; } - public void DeleteExchange(string exchangeName) - { - throw new NotImplementedException(); // FIXME - } - - public void DeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait) - { - DoDeleteQueue(queueName, ifUnused, ifEmpty, noWait); - } - private void DoDeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait) { try { _logger.Debug(string.Format("DeleteQueue name={0}", queueName)); - AMQFrame queueDelete = QueueDeleteBody.CreateAMQFrame(_channelId, 0, - queueName, // queueName - ifUnused, // IfUnUsed - ifEmpty, // IfEmpty - noWait); // NoWait + AMQFrame queueDelete = QueueDeleteBody.CreateAMQFrame(_channelId, 0, queueName, ifUnused, ifEmpty, noWait); _replayFrames.Add(queueDelete); if (noWait) + { _connection.ProtocolWriter.Write(queueDelete); + } else + { _connection.ConvenientProtocolWriter.SyncWrite(queueDelete, typeof(QueueDeleteOkBody)); + } } catch (AMQException) { @@ -843,34 +949,12 @@ namespace Apache.Qpid.Client } } - public MessageConsumerBuilder CreateConsumerBuilder(string queueName) - { - return new MessageConsumerBuilder(this, queueName); - } - - public MessagePublisherBuilder CreatePublisherBuilder() - { - return new MessagePublisherBuilder(this); - } - - public string GenerateUniqueName() - { - string result = _connection.ProtocolSession.GenerateQueueName(); - return Regex.Replace(result, "[^a-z0-9_]", "_"); - } - - public void DeclareQueue(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete) - { - DoQueueDeclare(queueName, isDurable, isExclusive, isAutoDelete); - } - private void DoQueueDeclare(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete) { _logger.Debug(string.Format("DeclareQueue name={0} durable={1} exclusive={2}, auto-delete={3}", queueName, isDurable, isExclusive, isAutoDelete)); - AMQFrame queueDeclare = QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, - false, isDurable, isExclusive, + AMQFrame queueDeclare = QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive, isAutoDelete, true, null); _replayFrames.Add(queueDeclare); @@ -881,23 +965,16 @@ namespace Apache.Qpid.Client } } - public void DeclareExchange(String exchangeName, String exchangeClass) - { - _logger.Debug(string.Format("DeclareExchange vame={0} exchangeClass={1}", exchangeName, exchangeClass)); - - DeclareExchange(_channelId, 0, exchangeName, exchangeClass, false, false, false, false, true, null); - } - // AMQP-level method. private void DeclareExchange(ushort channelId, ushort ticket, string exchangeName, string exchangeClass, bool passive, bool durable, bool autoDelete, bool xinternal, bool noWait, FieldTable args) { _logger.Debug(String.Format("DeclareExchange channelId={0} exchangeName={1} exchangeClass={2}", - _channelId, exchangeName, exchangeClass)); + _channelId, exchangeName, exchangeClass)); - AMQFrame declareExchange = ExchangeDeclareBody.CreateAMQFrame( - channelId, ticket, exchangeName, exchangeClass, passive, durable, autoDelete, xinternal, noWait, args); + AMQFrame declareExchange = ExchangeDeclareBody.CreateAMQFrame(channelId, ticket, exchangeName, exchangeClass, passive, + durable, autoDelete, xinternal, noWait, args); _replayFrames.Add(declareExchange); @@ -911,7 +988,7 @@ namespace Apache.Qpid.Client else { throw new NotImplementedException("Don't use nowait=false with DeclareExchange"); -// _connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof (ExchangeDeclareOkBody)); + //_connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof (ExchangeDeclareOkBody)); } } @@ -935,75 +1012,167 @@ namespace Apache.Qpid.Client _connection.ProtocolWriter.Write(ackFrame); } - /// - /// Handle a message that bounced from the server, creating - /// the corresponding exception and notifying the connection about it - /// - /// Unprocessed message - private void ReturnBouncedMessage(UnprocessedMessage message) - { - try - { - AbstractQmsMessage bouncedMessage = - _messageFactoryRegistry.CreateMessage( - 0, false, message.ContentHeader, - message.Bodies - ); - - int errorCode = message.BounceBody.ReplyCode; - string reason = message.BounceBody.ReplyText; - _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")"); - AMQException exception; - if ( errorCode == AMQConstant.NO_CONSUMERS.Code ) - { - exception = new AMQNoConsumersException(reason, bouncedMessage); - } else if ( errorCode == AMQConstant.NO_ROUTE.Code ) - { - exception = new AMQNoRouteException(reason, bouncedMessage); - } else - { - exception = new AMQUndeliveredException(errorCode, reason, bouncedMessage); - } - _connection.ExceptionReceived(exception); - } catch ( Exception ex ) - { - _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", ex); - } - - } - - private void OnPrefetchLowMark(int count) - { - if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge ) - { - _logger.Warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + count); - Suspend(false); - } - } - private void OnPrefetchHighMark(int count) - { - if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge ) - { - _logger.Warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + count); - Suspend(true); - } - } - - private void Suspend(bool suspend) - { - lock ( _suspensionLock ) - { - if ( _logger.IsDebugEnabled ) - { - _logger.Debug("Setting channel flow : " + (suspend ? "suspended" : "unsuspended")); - } - - _suspended = suspend; - AMQFrame frame = ChannelFlowBody.CreateAMQFrame(_channelId, !suspend); - - Connection.ConvenientProtocolWriter.SyncWrite(frame, typeof(ChannelFlowOkBody)); - } - } + /// + /// Handle a message that bounced from the server, creating + /// the corresponding exception and notifying the connection about it + /// + /// Unprocessed message + private void ReturnBouncedMessage(UnprocessedMessage message) + { + try + { + AbstractQmsMessage bouncedMessage = + _messageFactoryRegistry.CreateMessage(0, false, message.ContentHeader, message.Bodies); + + int errorCode = message.BounceBody.ReplyCode; + string reason = message.BounceBody.ReplyText; + _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")"); + AMQException exception; + + if (errorCode == AMQConstant.NO_CONSUMERS.Code) + { + exception = new AMQNoConsumersException(reason, bouncedMessage); + } + else if (errorCode == AMQConstant.NO_ROUTE.Code) + { + exception = new AMQNoRouteException(reason, bouncedMessage); + } + else + { + exception = new AMQUndeliveredException(errorCode, reason, bouncedMessage); + } + + _connection.ExceptionReceived(exception); + } + catch (Exception ex) + { + _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", ex); + } + } + + private void OnPrefetchLowMark(int count) + { + if (_acknowledgeMode == AcknowledgeMode.NoAcknowledge) + { + _logger.Warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + count); + Suspend(false); + } + } + + private void OnPrefetchHighMark(int count) + { + if (_acknowledgeMode == AcknowledgeMode.NoAcknowledge) + { + _logger.Warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + count); + Suspend(true); + } + } + + private void Suspend(bool suspend) + { + lock (_suspensionLock) + { + if (_logger.IsDebugEnabled) + { + _logger.Debug("Setting channel flow : " + (suspend ? "suspended" : "unsuspended")); + } + + _suspended = suspend; + AMQFrame frame = ChannelFlowBody.CreateAMQFrame(_channelId, !suspend); + + Connection.ConvenientProtocolWriter.SyncWrite(frame, typeof(ChannelFlowOkBody)); + } + } + + /// A Dispatcher turns the consumption of incoming messages from an arrival queue, into event notifications on consumers. + /// The arrival queue is typically a blocking queue, on which a dispatcher waits for messages to consume. Upon receipt of a message + /// the dispatcher finds the consumer that is listening to the queue to which the message has been send and notifies it of the new + /// message. + /// + ///

The Dispatcher also contains logic to recognize bounced messages. Bounced messages returned from the broker can be + /// told apart from regular deliveries because they do not have a delivery queue set on them. When the dispatcher receives a + /// bounced message it creates an exception and notifies the connection, to which its containing channel belongs, of the condition. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Notify consumers of message arrivals on their queues. + ///
Notify the containing connection of bounced message arrivals. + ///
+ ///

+ /// + /// Stop mechanism seems wrong, as queue consume is evaluated after stop flag, so could consume and notify one more message. + /// Placing stop check after consume may also be wrong as it may cause a message to be thrown away. Seems more correct to use interupt on + /// the block thread to cause it to prematurely return from its wait, whereupon it can be made to re-check the stop flag. + /// + /// Exception swalled, if there is an exception whilst notifying the connection on bounced messages. Unhandled excetpion should + /// fall through and termiante the loop, as it is a bug if it occurrs. + private class Dispatcher + { + /// Flag used to indicate when this dispatcher is to be stopped (0=go, 1=stop). + private int _stopped = 0; + + /// The channel for which this is a dispatcher. + private AmqChannel _containingChannel; + + /// Creates a dispatcher on the specified channel. + /// + /// The channel on which this is a dispatcher. + public Dispatcher(AmqChannel containingChannel) + { + _containingChannel = containingChannel; + } + + /// The message dispatch loop. Consumes messages from the channels queue, notifying consumers of regular deliveries, and + /// the connection of bounced messages. + public void RunDispatcher() + { + UnprocessedMessage message; + + while (_stopped == 0 && (message = (UnprocessedMessage)_containingChannel._queue.Dequeue()) != null) + { + if (message.DeliverBody != null) + { + BasicMessageConsumer consumer = (BasicMessageConsumer) _containingChannel._consumers[message.DeliverBody.ConsumerTag]; + + if (consumer == null) + { + _logger.Warn("Received a message from queue " + message.DeliverBody.ConsumerTag + " without a f - ignoring..."); + } + else + { + consumer.NotifyMessage(message, _containingChannel.ChannelId); + } + } + else + { + try + { + // Bounced message is processed here, away from the transport thread + AbstractQmsMessage bouncedMessage = _containingChannel._messageFactoryRegistry. + CreateMessage(0, false, message.ContentHeader, message.Bodies); + + int errorCode = message.BounceBody.ReplyCode; + string reason = message.BounceBody.ReplyText; + + _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")"); + + _containingChannel._connection.ExceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage)); + } + catch (Exception e) + { + _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e); + } + } + } + + _logger.Debug("Dispatcher thread terminating for channel: " + _containingChannel._channelId + "."); + } + /// Sets a stop flag on this dispatcher, which causes its dispatch loop to exit at the next available opportunity. + public void StopDispatcher() + { + Interlocked.Exchange(ref _stopped, 1); + } + } } } diff --git a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs index fd4ff79505..6a5ba82264 100644 --- a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs +++ b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs @@ -34,9 +34,7 @@ namespace Apache.Qpid.Client private bool _noLocal; - /** - * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover - */ + /** Holds the exclusive status flag for the consumers access to its queue. */ private bool _exclusive; public bool Exclusive @@ -448,6 +446,5 @@ namespace Apache.Qpid.Client break; } } - } } diff --git a/dotnet/Qpid.Client/Client/Closeable.cs b/dotnet/Qpid.Client/Client/Closeable.cs index 382e0aefde..b9664ccea3 100644 --- a/dotnet/Qpid.Client/Client/Closeable.cs +++ b/dotnet/Qpid.Client/Client/Closeable.cs @@ -23,26 +23,42 @@ using Apache.Qpid.Messaging; namespace Apache.Qpid.Client { - public abstract class Closeable : ICloseable + /// Closeable provides monitoring of the state of a closeable resource; whether it is open or closed. It also provides a lock on which + /// attempts to close the resource from multiple threads can be coordinated. + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Close (and clean-up) a resource. + ///
Monitor the state of a closeable resource. + ///
Synchronous attempts to close resource from concurrent threads. + ///
+ ///

+ /// + /// Poor encapsulation of the close lock. Better to completely hide the implementation, such that there is a method, e.g., DoSingleClose, + /// that sub-classes implement. Guaranteed to only be called by one thread at once, and iff the object is not already closed. That is, multiple + /// simultaneous closes will result in a single call to the real close method. Put the wait and condition checking loop in this base class. + /// + public abstract class Closeable : ICloseable { - /// - /// Used to ensure orderly closing of the object. The only method that is allowed to be called - /// from another thread of control is close(). - /// + /// Constant representing the closed state. + protected const int CLOSED = 1; + + /// Constant representing the open state. + protected const int NOT_CLOSED = 2; + + /// Used to ensure orderly closing of the object. protected readonly object _closingLock = new object(); - /// - /// All access to this field should be using the Inerlocked class, to make it atomic. - /// Hence it is an int since you cannot use a bool with the Interlocked class. - /// + /// Indicates the state of this resource; open or closed. protected int _closed = NOT_CLOSED; - protected const int CLOSED = 1; - protected const int NOT_CLOSED = 2; - /// /// Checks the not closed. /// + /// + /// Don't like check methods that throw exceptions. a) it can come as a surprise without checked exceptions, b) it limits the + /// callers choice, if the caller would prefer a boolean, c) it is not side-effect free programming, where such could be used. Get rid + /// of this and replace with boolean. protected void CheckNotClosed() { if (_closed == CLOSED) @@ -51,9 +67,7 @@ namespace Apache.Qpid.Client } } - /// - /// Gets a value indicating whether this is closed. - /// + /// Indicates whether this resource is closed. /// true if closed; otherwise, false. public bool Closed { @@ -63,10 +77,7 @@ namespace Apache.Qpid.Client } } - /// - /// Close the resource - /// - /// If something goes wrong + /// Close this resource. public abstract void Close(); } } -- cgit v1.2.1