diff options
Diffstat (limited to 'qpid/dotnet/Qpid.Client/Client/AmqChannel.cs')
-rw-r--r-- | qpid/dotnet/Qpid.Client/Client/AmqChannel.cs | 1190 |
1 files changed, 1190 insertions, 0 deletions
diff --git a/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs b/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs new file mode 100644 index 0000000000..ce8e2ca2fe --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs @@ -0,0 +1,1190 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using System.Text.RegularExpressions; +using System.Threading; +using log4net; +using Apache.Qpid.Buffer; +using Apache.Qpid.Client.Message; +using Apache.Qpid.Client.Util; +using Apache.Qpid.Collections; +using Apache.Qpid.Framing; +using Apache.Qpid.Messaging; +using Apache.Qpid.Protocol; + +namespace Apache.Qpid.Client +{ + /// <summary> + /// <p/><table id="crc"><caption>CRC Card</caption> + /// <tr><th> Responsibilities <th> Collaborations + /// <tr><td> Declare queues. + /// <tr><td> Declare exchanges. + /// <tr><td> Bind queues to exchanges. + /// <tr><td> Create messages. + /// <tr><td> Set up message consumers on the channel. + /// <tr><td> Set up message producers on the channel. + /// <tr><td> Commit the current transaction. + /// <tr><td> Roll-back the current transaction. + /// <tr><td> Close the channel. + /// </table> + /// </summary> + public class AmqChannel : Closeable, IChannel + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(AmqChannel)); + + internal const int BASIC_CONTENT_TYPE = 60; + + public const int DEFAULT_PREFETCH_HIGH_MARK = 5000; + + public const int DEFAULT_PREFETCH_LOW_MARK = 2500; + + private static int _nextSessionNumber = 0; + + private AMQConnection _connection; + + private int _sessionNumber; + + private bool _suspended; + + private object _suspensionLock = new object(); + + // Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature. + private int _nextConsumerNumber = 1; + + private bool _transacted; + + private AcknowledgeMode _acknowledgeMode; + + private ushort _channelId; + + private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK; + + private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK; + + private FlowControlQueue _queue; + + private Dispatcher _dispatcher; + + private MessageFactoryRegistry _messageFactoryRegistry; + + /// <summary> Holds all of the producers created by this channel. </summary> + private Hashtable _producers = Hashtable.Synchronized(new Hashtable()); + + /// <summary> Holds all of the consumers created by this channel. </summary> + private Hashtable _consumers = Hashtable.Synchronized(new Hashtable()); + + private ArrayList _replayFrames = new ArrayList(); + + /// <summary> + /// The counter of the _next producer id. This id is generated by the session and used only to allow the + /// producer to identify itself to the session when deregistering itself. + /// + /// Access to this id does not require to be synchronized since according to the JMS specification only one + /// thread of control is allowed to create producers for any given session instance. + /// </summary> + private long _nextProducerId; + + /// <summary> + /// Initializes a new instance of the <see cref="AmqChannel"/> class. + /// </summary> + /// <param name="con">The connection.</param> + /// <param name="channelId">The channel id.</param> + /// <param name="transacted">if set to <c>true</c> [transacted].</param> + /// <param name="acknowledgeMode">The acknowledge mode.</param> + /// <param name="defaultPrefetchHigh">Default prefetch high value</param> + /// <param name="defaultPrefetchLow">Default prefetch low value</param> + internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode, + int defaultPrefetchHigh, int defaultPrefetchLow) + : this() + { + _sessionNumber = Interlocked.Increment(ref _nextSessionNumber); + _connection = con; + _transacted = transacted; + + if ( transacted ) + { + _acknowledgeMode = AcknowledgeMode.SessionTransacted; + } + else + { + _acknowledgeMode = acknowledgeMode; + } + + _channelId = channelId; + _defaultPrefetchHighMark = defaultPrefetchHigh; + _defaultPrefetchLowMark = defaultPrefetchLow; + + if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge ) + { + _queue = new FlowControlQueue(_defaultPrefetchLowMark, _defaultPrefetchHighMark, + new ThresholdMethod(OnPrefetchLowMark), + new ThresholdMethod(OnPrefetchHighMark)); + } + else + { + // low and upper are the same + _queue = new FlowControlQueue(_defaultPrefetchHighMark, _defaultPrefetchHighMark, + null, null); + } + } + + private AmqChannel() + { + _messageFactoryRegistry = MessageFactoryRegistry.NewDefaultRegistry(); + } + + /// <summary> + /// Acknowledge mode for messages received. + /// </summary> + public AcknowledgeMode AcknowledgeMode + { + get + { + CheckNotClosed(); + return _acknowledgeMode; + } + } + + /// <summary> + /// True if the channel should use transactions. + /// </summary> + public bool Transacted + { + get + { + CheckNotClosed(); + return _transacted; + } + } + + /// <summary> + /// Prefetch value to be used as the default for + /// consumers created on this channel. + /// </summary> + public int DefaultPrefetch + { + get { return DefaultPrefetchHigh; } + } + + /// <summary> + /// Prefetch low value to be used as the default for + /// consumers created on this channel. + /// </summary> + public int DefaultPrefetchLow + { + get { return _defaultPrefetchLowMark; } + } + + /// <summary> + /// Prefetch high value to be used as the default for + /// consumers created on this channel. + /// </summary> + public int DefaultPrefetchHigh + { + get { return _defaultPrefetchHighMark; } + } + + /// <summary> Indicates whether or not this channel is currently suspended. </summary> + public bool IsSuspended + { + get { return _suspended; } + } + + /// <summary> Provides the channels number within the the connection. </summary> + public ushort ChannelId + { + get { return _channelId; } + } + + /// <summary> Provides the connection that this channel runs over. </summary> + public AMQConnection Connection + { + get { return _connection; } + } + + /// <summary> + /// Declare a new exchange. + /// </summary> + /// <param name="exchangeName">Name of the exchange</param> + /// <param name="exchangeClass">Class of the exchange, from <see cref="ExchangeClassConstants"/></param> + public void DeclareExchange(String exchangeName, String exchangeClass) + { + _logger.Debug(string.Format("DeclareExchange vame={0} exchangeClass={1}", exchangeName, exchangeClass)); + + DeclareExchange(_channelId, 0, exchangeName, exchangeClass, false, false, false, false, true, null); + } + + /// <summary> + /// Declare a new exchange using the default exchange class. + /// </summary> + /// <param name="exchangeName">Name of the exchange</param> + public void DeleteExchange(string exchangeName) + { + throw new NotImplementedException(); + } + + /// <summary> + /// Declare a new queue with the specified set of arguments. + /// </summary> + /// <param name="queueName">Name of the queue</param> + /// <param name="isDurable">True if the queue should be durable</param> + /// <param name="isExclusive">True if the queue should be exclusive to this channel</param> + /// <param name="isAutoDelete">True if the queue should be deleted when the channel closes</param> + public void DeclareQueue(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete) + { + DoQueueDeclare(queueName, isDurable, isExclusive, isAutoDelete); + } + + /// <summary> + /// Delete a queue with the specifies arguments. + /// </summary> + /// <param name="queueName">Name of the queue to delete</param> + /// <param name="ifUnused">If true, the queue will not deleted if it has no consumers</param> + /// <param name="ifEmpty">If true, the queue will not deleted if it has no messages</param> + /// <param name="noWait">If true, the server will not respond to the method</param> + public void DeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait) + { + DoDeleteQueue(queueName, ifUnused, ifEmpty, noWait); + } + + /// <summary> + /// Generate a new Unique name to use for a queue. + /// </summary> + /// <returns>A unique name to this channel</returns> + public string GenerateUniqueName() + { + string result = _connection.ProtocolSession.GenerateQueueName(); + return Regex.Replace(result, "[^a-z0-9_]", "_"); + } + + /// <summary> + /// Removes all messages from a queue. + /// </summary> + /// <param name="queueName">Name of the queue to delete</param> + /// <param name="noWait">If true, the server will not respond to the method</param> + public void PurgeQueue(string queueName, bool noWait) + { + DoPurgeQueue(queueName, noWait); + } + + /// <summary> + /// Bind a queue to the specified exchange. + /// </summary> + /// <param name="queueName">Name of queue to bind</param> + /// <param name="exchangeName">Name of exchange to bind to</param> + /// <param name="routingKey">Routing key</param> + public void Bind(string queueName, string exchangeName, string routingKey) + { + DoBind(queueName, exchangeName, routingKey, new FieldTable()); + } + + /// <summary> + /// Bind a queue to the specified exchange. + /// </summary> + /// <param name="queueName">Name of queue to bind</param> + /// <param name="exchangeName">Name of exchange to bind to</param> + /// <param name="routingKey">Routing key</param> + /// <param name="args">Table of arguments for the binding. Used to bind with a Headers Exchange</param> + public void Bind(string queueName, string exchangeName, string routingKey, IFieldTable args) + { + DoBind(queueName, exchangeName, routingKey, (FieldTable)args); + } + + /// <summary> + /// Create a new empty message with no body. + /// </summary> + /// <returns>The new message</returns> + public IMessage CreateMessage() + { + return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream"); + } + + /// <summary> + /// Create a new message of the specified MIME type. + /// </summary> + /// <param name="mimeType">The mime type to create</param> + /// <returns>The new message</returns> + public IMessage CreateMessage(string mimeType) + { + return _messageFactoryRegistry.CreateMessage(mimeType); + } + + /// <summary> + /// Creates a new message for bytes (application/octet-stream). + /// </summary> + /// <returns>The new message</returns> + public IBytesMessage CreateBytesMessage() + { + return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream"); + } + + /// <summary> + /// Creates a new text message (text/plain) with empty content. + /// </summary> + /// <returns>The new message</returns> + public ITextMessage CreateTextMessage() + { + return CreateTextMessage(String.Empty); + } + + /// <summary> + /// Creates a new text message (text/plain) with a body. + /// </summary> + /// <param name="text">Initial body of the message</param> + /// <returns>The new message</returns> + public ITextMessage CreateTextMessage(string text) + { + ITextMessage msg = (ITextMessage)_messageFactoryRegistry.CreateMessage("text/plain"); + msg.Text = text; + return msg; + } + + /// <summary> + /// Creates a new Consumer using the builder pattern. + /// </summary> + /// <param name="queueName">Name of queue to receive messages from</param> + /// <returns>The builder object</returns> + public MessageConsumerBuilder CreateConsumerBuilder(string queueName) + { + return new MessageConsumerBuilder(this, queueName); + } + + /// <summary> + /// Creates a new consumer. + /// </summary> + /// <param name="queueName">Name of queue to receive messages from</param> + /// <param name="prefetchLow">Low prefetch value</param> + /// <param name="prefetchHigh">High prefetch value</param> + /// <param name="noLocal">If true, messages sent on this channel will not be received by this consumer</param> + /// <param name="exclusive">If true, the consumer opens the queue in exclusive mode</param> + /// <returns>The new consumer</returns> + public IMessageConsumer CreateConsumer(string queueName, + int prefetchLow, + int prefetchHigh, + bool noLocal, + bool exclusive) + { + _logger.Debug(String.Format("CreateConsumer queueName={0} prefetchLow={1} prefetchHigh={2} noLocal={3} exclusive={4} ", + queueName, prefetchLow, prefetchHigh, noLocal, exclusive)); + + return CreateConsumerImpl(queueName, prefetchLow, prefetchHigh, noLocal, exclusive); + } + + /// <summary> + /// Unsubscribe from a queue. + /// </summary> + /// <param name="subscriptionName">Subscription name</param> + public void Unsubscribe(String name) + { + throw new NotImplementedException(); + } + + /// <summary> + /// Create a new message publisher using the builder pattern. + /// </summary> + /// <returns>The builder object</returns> + public MessagePublisherBuilder CreatePublisherBuilder() + { + return new MessagePublisherBuilder(this); + } + + /// <summary> + /// Create a new message publisher. + /// </summary> + /// <param name="exchangeName">Name of exchange to publish to</param> + /// <param name="routingKey">Routing key</param> + /// <param name="deliveryMode">Default delivery mode</param> + /// <param name="timeToLive">Default TTL time of messages</param> + /// <param name="immediate">If true, sent immediately</param> + /// <param name="mandatory">If true, the broker will return an error + /// (as a connection exception) if the message cannot be delivered</param> + /// <param name="priority">Default message priority</param> + /// <returns>The new message publisher</returns> + public IMessagePublisher CreatePublisher(string exchangeName, string routingKey, DeliveryMode deliveryMode, + long timeToLive, bool immediate, bool mandatory, int priority) + { + _logger.Debug(string.Format("Using new CreatePublisher exchangeName={0}, exchangeClass={1} routingKey={2}", + exchangeName, "none", routingKey)); + + return CreateProducerImpl(exchangeName, routingKey, deliveryMode, + timeToLive, immediate, mandatory, priority); + } + + /// <summary> + /// Recover after transaction failure. + /// </summary> + /// <remarks>The 0-8 protocol does not support this, not implemented exception will always be thrown.</remarks> + public void Recover() + { + CheckNotClosed(); + CheckNotTransacted(); + + throw new NotImplementedException(); + } + + /// <summary> + /// Commit the transaction. + /// </summary> + public void Commit() + { + // FIXME: Fail over safety. Needs FailoverSupport? + CheckNotClosed(); + CheckTransacted(); // throws IllegalOperationException if not a transacted session + + try + { + // Acknowledge up to message last delivered (if any) for each consumer. + // Need to send ack for messages delivered to consumers so far. + foreach (BasicMessageConsumer consumer in _consumers.Values) + { + // Sends acknowledgement to server. + consumer.AcknowledgeDelivered(); + } + + // Commits outstanding messages sent and outstanding acknowledgements. + _connection.ConvenientProtocolWriter.SyncWrite(TxCommitBody.CreateAMQFrame(_channelId), typeof(TxCommitOkBody)); + } + catch (AMQException e) + { + throw new QpidException("Failed to commit", e); + } + } + + /// <summary> + /// Rollback the transaction. + /// </summary> + public void Rollback() + { + lock (_suspensionLock) + { + CheckTransacted(); // throws IllegalOperationException if not a transacted session + + try + { + bool suspended = IsSuspended; + if (!suspended) + { + Suspend(true); + } + + // Reject up to message last delivered (if any) for each consumer. + // Need to send reject for messages delivered to consumers so far. + foreach (BasicMessageConsumer consumer in _consumers.Values) + { + // Sends acknowledgement to server. + consumer.RejectUnacked(); + } + + _connection.ConvenientProtocolWriter.SyncWrite(TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody)); + + if ( !suspended ) + { + Suspend(false); + } + } + catch (AMQException e) + { + throw new QpidException("Failed to rollback", e); + } + } + } + + /// <summary> + /// Create a disconnected channel that will fault + /// for most things, but is useful for testing + /// </summary> + /// <returns>A new disconnected channel</returns> + public static IChannel CreateDisconnectedChannel() + { + return new AmqChannel(); + } + + public override void Close() + { + lock (_connection.FailoverMutex) + { + // We must close down all producers and consumers in an orderly fashion. This is the only method + // that can be called from a different thread of control from the one controlling the session + + lock (_closingLock) + { + SetClosed(); + + // we pass null since this is not an error case + CloseProducersAndConsumers(null); + + try + { + _connection.CloseSession(this); + } + catch (AMQException e) + { + throw new QpidException("Error closing session: " + e); + } + finally + { + _connection.DeregisterSession(_channelId); + } + } + } + } + + /** + * Called when the server initiates the closure of the session + * unilaterally. + * @param e the exception that caused this session to be closed. Null causes the + */ + public void ClosedWithException(Exception e) + { + lock (_connection.FailoverMutex) + { + // An AMQException has an error code and message already and will be passed in when closure occurs as a + // result of a channel close request + SetClosed(); + AMQException amqe; + + if (e is AMQException) + { + amqe = (AMQException) e; + } + else + { + amqe = new AMQException("Closing session forcibly", e); + } + + _connection.DeregisterSession(_channelId); + CloseProducersAndConsumers(amqe); + } + } + + public void MessageReceived(UnprocessedMessage message) + { + if (_logger.IsDebugEnabled) + { + _logger.Debug("Message received in session with channel id " + _channelId); + } + + if ( message.DeliverBody == null ) + { + ReturnBouncedMessage(message); + } + else + { + _queue.Enqueue(message); + } + } + + public void Dispose() + { + Close(); + } + + private void SetClosed() + { + Interlocked.Exchange(ref _closed, CLOSED); + } + + /// <summary> + /// Close all producers or consumers. This is called either in the error case or when closing the session normally. + /// <param name="amqe">the exception, may be null to indicate no error has occurred</param> + /// + private void CloseProducersAndConsumers(AMQException amqe) + { + try + { + CloseProducers(); + } + catch (QpidException e) + { + _logger.Error("Error closing session: " + e, e); + } + try + { + CloseConsumers(amqe); + } + catch (QpidException e) + { + _logger.Error("Error closing session: " + e, e); + } + } + + /// <summary> + /// Called to close message producers cleanly. This may or may <b>not</b> be as a result of an error. There is + /// currently no way of propagating errors to message producers (this is a JMS limitation). + /// </summary> + private void CloseProducers() + { + _logger.Debug("Closing producers on session " + this); + + // we need to clone the list of producers since the close() method updates the _producers collection + // which would result in a concurrent modification exception + ArrayList clonedProducers = new ArrayList(_producers.Values); + + foreach (BasicMessageProducer prod in clonedProducers) + { + _logger.Debug("Closing producer " + prod); + prod.Close(); + } + + // at this point the _producers map is empty + } + + /// <summary> + /// Called to close message consumers cleanly. This may or may <b>not</b> be as a result of an error. + /// <param name="error">not null if this is a result of an error occurring at the connection level</param> + private void CloseConsumers(Exception error) + { + if (_dispatcher != null) + { + _dispatcher.StopDispatcher(); + } + + // we need to clone the list of consumers since the close() method updates the _consumers collection + // which would result in a concurrent modification exception + ArrayList clonedConsumers = new ArrayList(_consumers.Values); + + foreach (BasicMessageConsumer con in clonedConsumers) + { + if (error != null) + { + con.NotifyError(error); + } + else + { + con.Close(); + } + } + + // at this point the _consumers map will be empty + } + + private IMessagePublisher CreateProducerImpl(string exchangeName, string routingKey, + DeliveryMode deliveryMode, + long timeToLive, bool immediate, bool mandatory, int priority) + { + lock (_closingLock) + { + CheckNotClosed(); + + try + { + return new BasicMessageProducer(exchangeName, routingKey, _transacted, _channelId, + this, GetNextProducerId(), + deliveryMode, timeToLive, immediate, mandatory, priority); + } + catch (AMQException e) + { + _logger.Error("Error creating message producer: " + e, e); + throw new QpidException("Error creating message producer", e); + } + } + } + + /// <summary> Creates a message consumer on this channel.</summary> + /// + /// <param name="queueName">The name of the queue to attach the consumer to.</param> + /// <param name="prefetchLow">The pre-fetch buffer low-water mark.</param> + /// <param name="prefetchHigh">The pre-fetch buffer high-water mark.</param> + /// <param name="noLocal">The no-local flag, <tt>true</tt> means that the consumer does not receive messages sent on this channel.</param> + /// <param name="exclusive">The exclusive flag, <tt>true</tt> gives this consumer exclusive receive access to the queue.</param> + /// + /// <return>The message consumer.</return> + private IMessageConsumer CreateConsumerImpl(string queueName, + int prefetchLow, + int prefetchHigh, + bool noLocal, + bool exclusive) + { + lock (_closingLock) + { + CheckNotClosed(); + + BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, queueName, noLocal, + _messageFactoryRegistry, this, + prefetchHigh, prefetchLow, exclusive); + try + { + RegisterConsumer(consumer); + } + catch (AMQException e) + { + throw new QpidException("Error registering consumer: " + e, e); + } + + return consumer; + } + } + + private void CheckTransacted() + { + if (!Transacted) + { + throw new InvalidOperationException("Channel is not transacted"); + } + } + + private void CheckNotTransacted() + { + if (Transacted) + { + throw new InvalidOperationException("Channel is transacted"); + } + } + + internal void Start() + { + _dispatcher = new Dispatcher(this); + Thread dispatcherThread = new Thread(new ThreadStart(_dispatcher.RunDispatcher)); + dispatcherThread.IsBackground = true; + dispatcherThread.Start(); + } + + internal void Stop() + { + Suspend(true); + if (_dispatcher != null) + { + _dispatcher.StopDispatcher(); + } + } + + internal void RegisterConsumer(string consumerTag, IMessageConsumer consumer) + { + _consumers[consumerTag] = consumer; + } + + /// <summary> + /// Called by the MessageConsumer when closing, to deregister the consumer from the + /// map from consumerTag to consumer instance. + /// </summary> + /// <param name="consumerTag">the consumer tag, that was broker-generated</param> + internal void DeregisterConsumer(string consumerTag) + { + _consumers.Remove(consumerTag); + } + + internal void RegisterProducer(long producerId, IMessagePublisher publisher) + { + _producers[producerId] = publisher; + } + + internal void DeregisterProducer(long producerId) + { + _producers.Remove(producerId); + } + + private long GetNextProducerId() + { + return ++_nextProducerId; + } + + /** + * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after + * failover when the client has veoted resubscription. + * + * The caller of this method must already hold the failover mutex. + */ + internal void MarkClosed() + { + SetClosed(); + _connection.DeregisterSession(_channelId); + MarkClosedProducersAndConsumers(); + } + + private void MarkClosedProducersAndConsumers() + { + try + { + // no need for a markClosed* method in this case since there is no protocol traffic closing a producer + CloseProducers(); + } + catch (QpidException e) + { + _logger.Error("Error closing session: " + e, e); + } + try + { + MarkClosedConsumers(); + } + catch (QpidException e) + { + _logger.Error("Error closing session: " + e, e); + } + } + + private void MarkClosedConsumers() + { + if (_dispatcher != null) + { + _dispatcher.StopDispatcher(); + } + // we need to clone the list of consumers since the close() method updates the _consumers collection + // which would result in a concurrent modification exception + ArrayList clonedConsumers = new ArrayList(_consumers.Values); + + foreach (BasicMessageConsumer consumer in clonedConsumers) + { + consumer.MarkClosed(); + } + // at this point the _consumers map will be empty + } + + private void DoPurgeQueue(string queueName, bool noWait) + { + try + { + _logger.DebugFormat("PurgeQueue {0}", queueName); + + AMQFrame purgeQueue = QueuePurgeBody.CreateAMQFrame(_channelId, 0, queueName, noWait); + + if (noWait) + _connection.ProtocolWriter.Write(purgeQueue); + else + _connection.ConvenientProtocolWriter.SyncWrite(purgeQueue, typeof(QueuePurgeOkBody)); + } + catch (AMQException) + { + throw; + } + } + + /** + * Replays frame on fail over. + * + * @throws AMQException + */ + internal void ReplayOnFailOver() + { + _logger.Debug(string.Format("Replaying frames for channel {0}", _channelId)); + foreach (AMQFrame frame in _replayFrames) + { + _logger.Debug(string.Format("Replaying frame=[{0}]", frame)); + _connection.ProtocolWriter.Write(frame); + } + } + + /// <summary> + /// Callers must hold the failover mutex before calling this method. + /// </summary> + /// <param name="consumer"></param> + private void RegisterConsumer(BasicMessageConsumer consumer) + { + String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.NoLocal, + consumer.Exclusive, consumer.AcknowledgeMode); + consumer.ConsumerTag = consumerTag; + _consumers.Add(consumerTag, consumer); + } + + internal void DoBind(string queueName, string exchangeName, string routingKey, FieldTable args) + { + + _logger.Debug(string.Format("QueueBind queueName={0} exchangeName={1} routingKey={2}, arg={3}", + queueName, exchangeName, routingKey, args)); + + AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0, + queueName, exchangeName, + routingKey, true, args); + _replayFrames.Add(queueBind); + + lock (_connection.FailoverMutex) + { + _connection.ProtocolWriter.Write(queueBind); + } + } + + private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode) + { + // Need to generate a consumer tag on the client so we can exploit the nowait flag. + String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++); + + AMQFrame basicConsume = BasicConsumeBody.CreateAMQFrame(_channelId, 0, + queueName, tag, noLocal, + acknowledgeMode == AcknowledgeMode.NoAcknowledge, + exclusive, true, new FieldTable()); + + _replayFrames.Add(basicConsume); + + _connection.ProtocolWriter.Write(basicConsume); + return tag; + } + + private void DoDeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait) + { + try + { + _logger.Debug(string.Format("DeleteQueue name={0}", queueName)); + + AMQFrame queueDelete = QueueDeleteBody.CreateAMQFrame(_channelId, 0, queueName, ifUnused, ifEmpty, noWait); + + _replayFrames.Add(queueDelete); + + if (noWait) + { + _connection.ProtocolWriter.Write(queueDelete); + } + else + { + _connection.ConvenientProtocolWriter.SyncWrite(queueDelete, typeof(QueueDeleteOkBody)); + } + } + catch (AMQException) + { + throw; + } + } + + private void DoQueueDeclare(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete) + { + _logger.Debug(string.Format("DeclareQueue name={0} durable={1} exclusive={2}, auto-delete={3}", + queueName, isDurable, isExclusive, isAutoDelete)); + + AMQFrame queueDeclare = QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive, + isAutoDelete, true, null); + + _replayFrames.Add(queueDeclare); + + lock (_connection.FailoverMutex) + { + _connection.ProtocolWriter.Write(queueDeclare); + } + } + + // AMQP-level method. + private void DeclareExchange(ushort channelId, ushort ticket, string exchangeName, + string exchangeClass, bool passive, bool durable, + bool autoDelete, bool xinternal, bool noWait, FieldTable args) + { + _logger.Debug(String.Format("DeclareExchange channelId={0} exchangeName={1} exchangeClass={2}", + _channelId, exchangeName, exchangeClass)); + + AMQFrame declareExchange = ExchangeDeclareBody.CreateAMQFrame(channelId, ticket, exchangeName, exchangeClass, passive, + durable, autoDelete, xinternal, noWait, args); + + _replayFrames.Add(declareExchange); + + if (noWait) + { + lock (_connection.FailoverMutex) + { + _connection.ProtocolWriter.Write(declareExchange); + } + } + else + { + throw new NotImplementedException("Don't use nowait=false with DeclareExchange"); + //_connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof (ExchangeDeclareOkBody)); + } + } + + /** + * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from + * a BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is + * AUTO_ACK or similar. + * + * @param deliveryTag the tag of the last message to be acknowledged + * @param multiple if true will acknowledge all messages up to and including the one specified by the + * delivery tag + */ + internal void AcknowledgeMessage(ulong deliveryTag, bool multiple) + { + AMQFrame ackFrame = BasicAckBody.CreateAMQFrame(_channelId, deliveryTag, multiple); + if (_logger.IsDebugEnabled) + { + _logger.Debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); + } + // FIXME: lock FailoverMutex here? + _connection.ProtocolWriter.Write(ackFrame); + } + + public void RejectMessage(ulong deliveryTag, bool requeue) + { + if ((_acknowledgeMode == AcknowledgeMode.ClientAcknowledge) || (_acknowledgeMode == AcknowledgeMode.SessionTransacted)) + { + AMQFrame rejectFrame = BasicRejectBody.CreateAMQFrame(_channelId, deliveryTag, requeue); + _connection.ProtocolWriter.Write(rejectFrame); + } + } + + /// <summary> + /// Handle a message that bounced from the server, creating + /// the corresponding exception and notifying the connection about it + /// </summary> + /// <param name="message">Unprocessed message</param> + private void ReturnBouncedMessage(UnprocessedMessage message) + { + try + { + AbstractQmsMessage bouncedMessage = + _messageFactoryRegistry.CreateMessage(0, false, message.ContentHeader, message.Bodies); + + int errorCode = message.BounceBody.ReplyCode; + string reason = message.BounceBody.ReplyText; + _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")"); + AMQException exception; + + if (errorCode == AMQConstant.NO_CONSUMERS.Code) + { + exception = new AMQNoConsumersException(reason, bouncedMessage); + } + else if (errorCode == AMQConstant.NO_ROUTE.Code) + { + exception = new AMQNoRouteException(reason, bouncedMessage); + } + else + { + exception = new AMQUndeliveredException(errorCode, reason, bouncedMessage); + } + + _connection.ExceptionReceived(exception); + } + catch (Exception ex) + { + _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", ex); + } + } + + private void OnPrefetchLowMark(int count) + { + if (_acknowledgeMode == AcknowledgeMode.NoAcknowledge) + { + _logger.Warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + count); + Suspend(false); + } + } + + private void OnPrefetchHighMark(int count) + { + if (_acknowledgeMode == AcknowledgeMode.NoAcknowledge) + { + _logger.Warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + count); + Suspend(true); + } + } + + private void Suspend(bool suspend) + { + lock (_suspensionLock) + { + if (_logger.IsDebugEnabled) + { + _logger.Debug("Setting channel flow : " + (suspend ? "suspended" : "unsuspended")); + } + + _suspended = suspend; + AMQFrame frame = ChannelFlowBody.CreateAMQFrame(_channelId, !suspend); + + Connection.ConvenientProtocolWriter.SyncWrite(frame, typeof(ChannelFlowOkBody)); + } + } + + /// <summary>A Dispatcher turns the consumption of incoming messages from an arrival queue, into event notifications on consumers. + /// The arrival queue is typically a blocking queue, on which a dispatcher waits for messages to consume. Upon receipt of a message + /// the dispatcher finds the consumer that is listening to the queue to which the message has been send and notifies it of the new + /// message. + /// + /// <p/>The Dispatcher also contains logic to recognize bounced messages. Bounced messages returned from the broker can be + /// told apart from regular deliveries because they do not have a delivery queue set on them. When the dispatcher receives a + /// bounced message it creates an exception and notifies the connection, to which its containing channel belongs, of the condition. + /// + /// <p/><table id="crc"><caption>CRC Card</caption> + /// <tr><th> Responsibilities <th> Collaborations + /// <tr><td> Notify consumers of message arrivals on their queues. <td> <see cref="BasicMessageConsumer"/> + /// <tr><td> Notify the containing connection of bounced message arrivals. <td> <see cref="AMQConnection"/> + /// </table> + /// </summary> + /// + /// <remarks>Stop mechanism seems wrong, as queue consume is evaluated after stop flag, so could consume and notify one more message. + /// Placing stop check after consume may also be wrong as it may cause a message to be thrown away. Seems more correct to use interupt on + /// the block thread to cause it to prematurely return from its wait, whereupon it can be made to re-check the stop flag.</remarks> + /// + /// <remarks>Exception swallowed, if there is an exception whilst notifying the connection on bounced messages. Unhandled excetpion should + /// fall through and terminate the loop, as it is a bug if it occurrs.</remarks> + private class Dispatcher + { + /// <summary> Flag used to indicate when this dispatcher is to be stopped (0=go, 1=stop). </summary> + private int _stopped = 0; + + /// <summary> The channel for which this is a dispatcher. </summary> + private AmqChannel _containingChannel; + + /// <summary> Creates a dispatcher on the specified channel. </summary> + /// + /// <param name="containingChannel"> The channel on which this is a dispatcher. </param> + public Dispatcher(AmqChannel containingChannel) + { + _containingChannel = containingChannel; + } + + /// <summary>The message dispatch loop. Consumes messages from the channels queue, notifying consumers of regular deliveries, and + /// the connection of bounced messages.</summary> + public void RunDispatcher() + { + UnprocessedMessage message; + + while (_stopped == 0 && (message = (UnprocessedMessage)_containingChannel._queue.Dequeue()) != null) + { + if (message.DeliverBody != null) + { + BasicMessageConsumer consumer = (BasicMessageConsumer) _containingChannel._consumers[message.DeliverBody.ConsumerTag]; + + if (consumer == null) + { + _logger.Warn("Received a message from queue " + message.DeliverBody.ConsumerTag + " without a f - ignoring..."); + } + else + { + consumer.NotifyMessage(message, _containingChannel.ChannelId); + } + } + else + { + try + { + // Bounced message is processed here, away from the transport thread + AbstractQmsMessage bouncedMessage = _containingChannel._messageFactoryRegistry. + CreateMessage(0, false, message.ContentHeader, message.Bodies); + + int errorCode = message.BounceBody.ReplyCode; + string reason = message.BounceBody.ReplyText; + + _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")"); + + _containingChannel._connection.ExceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage)); + } + catch (Exception e) + { + _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e); + } + } + } + + _logger.Debug("Dispatcher thread terminating for channel: " + _containingChannel._channelId + "."); + } + + /// <summary> Sets a stop flag on this dispatcher, which causes its dispatch loop to exit at the next available opportunity. </summary> + public void StopDispatcher() + { + Interlocked.Exchange(ref _stopped, 1); + } + } + } +} |