diff options
Diffstat (limited to 'Final/dotnet/Qpid.Client/Client/AmqChannel.cs')
-rw-r--r-- | Final/dotnet/Qpid.Client/Client/AmqChannel.cs | 1009 |
1 files changed, 1009 insertions, 0 deletions
diff --git a/Final/dotnet/Qpid.Client/Client/AmqChannel.cs b/Final/dotnet/Qpid.Client/Client/AmqChannel.cs new file mode 100644 index 0000000000..84f08729dd --- /dev/null +++ b/Final/dotnet/Qpid.Client/Client/AmqChannel.cs @@ -0,0 +1,1009 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using System.Text.RegularExpressions; +using System.Threading; +using log4net; +using Apache.Qpid.Buffer; +using Apache.Qpid.Client.Message; +using Apache.Qpid.Client.Util; +using Apache.Qpid.Collections; +using Apache.Qpid.Framing; +using Apache.Qpid.Messaging; +using Apache.Qpid.Protocol; + +namespace Apache.Qpid.Client +{ + public class AmqChannel : Closeable, IChannel + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(AmqChannel)); + + internal const int BASIC_CONTENT_TYPE = 60; + + private static int _nextSessionNumber = 0; + + private int _sessionNumber; + private bool _suspended; + private object _suspensionLock = new object(); + + // Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature. + private int _nextConsumerNumber = 1; + + public const int DEFAULT_PREFETCH_HIGH_MARK = 5000; + public const int DEFAULT_PREFETCH_LOW_MARK = 2500; + + private AMQConnection _connection; + + private bool _transacted; + + private AcknowledgeMode _acknowledgeMode; + + private ushort _channelId; + + private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK; + private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK; + + private FlowControlQueue _queue; + + private Dispatcher _dispatcher; + + private MessageFactoryRegistry _messageFactoryRegistry; + + /// <summary> + /// Set of all producers created by this session + /// </summary> + private Hashtable _producers = Hashtable.Synchronized(new Hashtable()); + + /// <summary> + /// Maps from consumer tag to JMSMessageConsumer instance + /// </summary> + private Hashtable _consumers = Hashtable.Synchronized(new Hashtable()); + + private ArrayList _replayFrames = new ArrayList(); + + /// <summary> + /// The counter of the _next producer id. This id is generated by the session and used only to allow the + /// producer to identify itself to the session when deregistering itself. + /// + /// Access to this id does not require to be synchronized since according to the JMS specification only one + /// thread of control is allowed to create producers for any given session instance. + /// </summary> + private long _nextProducerId; + + /// <summary> + /// Responsible for decoding a message fragment and passing it to the appropriate message consumer. + /// </summary> + private class Dispatcher + { + private int _stopped = 0; + + private AmqChannel _containingChannel; + + public Dispatcher(AmqChannel containingChannel) + { + _containingChannel = containingChannel; + } + + /// <summary> + /// Runs the dispatcher. This is intended to be Run in a separate thread. + /// </summary> + public void RunDispatcher() + { + UnprocessedMessage message; + + while (_stopped == 0 && (message = (UnprocessedMessage)_containingChannel._queue.Dequeue()) != null) + { + //_queue.size() + DispatchMessage(message); + } + + _logger.Debug("Dispatcher thread terminating for channel " + _containingChannel._channelId); + } + + private void DispatchMessage(UnprocessedMessage message) + { + if (message.DeliverBody != null) + { + BasicMessageConsumer consumer = (BasicMessageConsumer) _containingChannel._consumers[message.DeliverBody.ConsumerTag]; + + if (consumer == null) + { + _logger.Warn("Received a message from queue " + message.DeliverBody.ConsumerTag + " without a f - ignoring..."); + } + else + { + consumer.NotifyMessage(message, _containingChannel.ChannelId); + } + } + else + { + try + { + // Bounced message is processed here, away from the mina thread + AbstractQmsMessage bouncedMessage = _containingChannel._messageFactoryRegistry. + CreateMessage(0, false, message.ContentHeader, message.Bodies); + + int errorCode = message.BounceBody.ReplyCode; + string reason = message.BounceBody.ReplyText; + _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")"); + + _containingChannel._connection.ExceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage)); + } + catch (Exception e) + { + _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e); + } + } + } + + public void StopDispatcher() + { + Interlocked.Exchange(ref _stopped, 1); + } + } + + /// <summary> + /// Initializes a new instance of the <see cref="AmqChannel"/> class. + /// </summary> + /// <param name="con">The connection.</param> + /// <param name="channelId">The channel id.</param> + /// <param name="transacted">if set to <c>true</c> [transacted].</param> + /// <param name="acknowledgeMode">The acknowledge mode.</param> + /// <param name="defaultPrefetchHigh">Default prefetch high value</param> + /// <param name="defaultPrefetchLow">Default prefetch low value</param> + internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow) + : this() + { + _sessionNumber = Interlocked.Increment(ref _nextSessionNumber); + _connection = con; + _transacted = transacted; + if ( transacted ) + { + _acknowledgeMode = AcknowledgeMode.SessionTransacted; + } else + { + _acknowledgeMode = acknowledgeMode; + } + _channelId = channelId; + _defaultPrefetchHighMark = defaultPrefetchHigh; + _defaultPrefetchLowMark = defaultPrefetchLow; + + if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge ) + { + _queue = new FlowControlQueue( + _defaultPrefetchLowMark, _defaultPrefetchHighMark, + new ThresholdMethod(OnPrefetchLowMark), + new ThresholdMethod(OnPrefetchHighMark) + ); + } else + { + // low and upper are the same + _queue = new FlowControlQueue( + _defaultPrefetchHighMark, _defaultPrefetchHighMark, + null, null + ); + } + } + + private AmqChannel() + { + _messageFactoryRegistry = MessageFactoryRegistry.NewDefaultRegistry(); + } + + /// <summary> + /// Create a disconnected channel that will fault + /// for most things, but is useful for testing + /// </summary> + /// <returns>A new disconnected channel</returns> + public static IChannel CreateDisconnectedChannel() + { + return new AmqChannel(); + } + + + public IBytesMessage CreateBytesMessage() + { + return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream"); + } + + public IMessage CreateMessage() + { + // TODO: this is supposed to create a message consisting only of message headers + return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream"); + } + + public IMessage CreateMessage(string mimeType) + { + return _messageFactoryRegistry.CreateMessage(mimeType); + } + + public ITextMessage CreateTextMessage() + { + return CreateTextMessage(String.Empty); + } + + public ITextMessage CreateTextMessage(string text) + { + ITextMessage msg = (ITextMessage)_messageFactoryRegistry.CreateMessage("text/plain"); + msg.Text = text; + return msg; + } + + public bool Transacted + { + get + { + CheckNotClosed(); + return _transacted; + } + } + + public AcknowledgeMode AcknowledgeMode + { + get + { + CheckNotClosed(); + return _acknowledgeMode; + } + } + + public void Commit() + { + // FIXME: Fail over safety. Needs FailoverSupport? + CheckNotClosed(); + CheckTransacted(); // throws IllegalOperationException if not a transacted session + + try + { + // Acknowledge up to message last delivered (if any) for each consumer. + // Need to send ack for messages delivered to consumers so far. + foreach (BasicMessageConsumer consumer in _consumers.Values) + { + // Sends acknowledgement to server. + consumer.AcknowledgeLastDelivered(); + } + + // Commits outstanding messages sent and outstanding acknowledgements. + _connection.ConvenientProtocolWriter.SyncWrite(TxCommitBody.CreateAMQFrame(_channelId), typeof(TxCommitOkBody)); + } + catch (AMQException e) + { + throw new QpidException("Failed to commit", e); + } + } + + public void Rollback() + { + lock ( _suspensionLock ) + { + CheckTransacted(); // throws IllegalOperationException if not a transacted session + + try + { + bool suspended = IsSuspended; + if ( !suspended ) + Suspend(true); + + // todo: rollback dispatcher when TX support is added + //if ( _dispatcher != null ) + // _dispatcher.Rollback(); + + _connection.ConvenientProtocolWriter.SyncWrite( + TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody)); + + if ( !suspended ) + Suspend(false); + } catch ( AMQException e ) + { + throw new QpidException("Failed to rollback", e); + } + } + } + + public override void Close() + { + lock (_connection.FailoverMutex) + { + // We must close down all producers and consumers in an orderly fashion. This is the only method + // that can be called from a different thread of control from the one controlling the session + + lock (_closingLock) + { + SetClosed(); + + // we pass null since this is not an error case + CloseProducersAndConsumers(null); + + try + { + _connection.CloseSession(this); + } + catch (AMQException e) + { + throw new QpidException("Error closing session: " + e); + } + finally + { + _connection.DeregisterSession(_channelId); + } + } + } + } + + private void SetClosed() + { + Interlocked.Exchange(ref _closed, CLOSED); + } + + /// <summary> + /// Close all producers or consumers. This is called either in the error case or when closing the session normally. + /// <param name="amqe">the exception, may be null to indicate no error has occurred</param> + /// + private void CloseProducersAndConsumers(AMQException amqe) + { + try + { + CloseProducers(); + } + catch (QpidException e) + { + _logger.Error("Error closing session: " + e, e); + } + try + { + CloseConsumers(amqe); + } + catch (QpidException e) + { + _logger.Error("Error closing session: " + e, e); + } + } + + /** + * Called when the server initiates the closure of the session + * unilaterally. + * @param e the exception that caused this session to be closed. Null causes the + */ + public void ClosedWithException(Exception e) + { + lock (_connection.FailoverMutex) + { + // An AMQException has an error code and message already and will be passed in when closure occurs as a + // result of a channel close request + SetClosed(); + AMQException amqe; + if (e is AMQException) + { + amqe = (AMQException) e; + } + else + { + amqe = new AMQException("Closing session forcibly", e); + } + _connection.DeregisterSession(_channelId); + CloseProducersAndConsumers(amqe); + } + } + + /// <summary> + /// Called to close message producers cleanly. This may or may <b>not</b> be as a result of an error. There is + /// currently no way of propagating errors to message producers (this is a JMS limitation). + /// </summary> + private void CloseProducers() + { + _logger.Debug("Closing producers on session " + this); + // we need to clone the list of producers since the close() method updates the _producers collection + // which would result in a concurrent modification exception + ArrayList clonedProducers = new ArrayList(_producers.Values); + + foreach (BasicMessageProducer prod in clonedProducers) + { + _logger.Debug("Closing producer " + prod); + prod.Close(); + } + // at this point the _producers map is empty + } + + /// <summary> + /// Called to close message consumers cleanly. This may or may <b>not</b> be as a result of an error. + /// <param name="error">not null if this is a result of an error occurring at the connection level</param> + /// + private void CloseConsumers(Exception error) + { + if (_dispatcher != null) + { + _dispatcher.StopDispatcher(); + } + // we need to clone the list of consumers since the close() method updates the _consumers collection + // which would result in a concurrent modification exception + ArrayList clonedConsumers = new ArrayList(_consumers.Values); + + foreach (BasicMessageConsumer con in clonedConsumers) + { + if (error != null) + { + con.NotifyError(error); + } + else + { + con.Close(); + } + } + // at this point the _consumers map will be empty + } + + public void Recover() + { + CheckNotClosed(); + CheckNotTransacted(); // throws IllegalOperationException if not a transacted session + + // TODO: This cannot be implemented using 0.8 semantics + throw new NotImplementedException(); + } + + public void Run() + { + throw new NotImplementedException(); + } + + public IMessagePublisher CreatePublisher(string exchangeName, string routingKey, DeliveryMode deliveryMode, + long timeToLive, bool immediate, bool mandatory, int priority) + { + _logger.Debug(string.Format("Using new CreatePublisher exchangeName={0}, exchangeClass={1} routingKey={2}", + exchangeName, "none", routingKey)); + return CreateProducerImpl(exchangeName, routingKey, deliveryMode, + timeToLive, immediate, mandatory, priority); + } + + public IMessagePublisher CreateProducerImpl(string exchangeName, string routingKey, + DeliveryMode deliveryMode, + long timeToLive, bool immediate, bool mandatory, int priority) + { + lock (_closingLock) + { + CheckNotClosed(); + + try + { + return new BasicMessageProducer(exchangeName, routingKey, _transacted, _channelId, + this, GetNextProducerId(), + deliveryMode, timeToLive, immediate, mandatory, priority); + } + catch (AMQException e) + { + _logger.Error("Error creating message producer: " + e, e); + throw new QpidException("Error creating message producer", e); + } + } + } + + public IMessageConsumer CreateConsumer(string queueName, + int prefetchLow, + int prefetchHigh, + bool noLocal, + bool exclusive, + bool durable, + string subscriptionName) + { + _logger.Debug(String.Format("CreateConsumer queueName={0} prefetchLow={1} prefetchHigh={2} noLocal={3} exclusive={4} durable={5} subscriptionName={6}", + queueName, prefetchLow, prefetchHigh, noLocal, exclusive, durable, subscriptionName)); + return CreateConsumerImpl(queueName, prefetchLow, prefetchHigh, noLocal, exclusive, durable, subscriptionName); + } + + private IMessageConsumer CreateConsumerImpl(string queueName, + int prefetchLow, + int prefetchHigh, + bool noLocal, + bool exclusive, + bool durable, + string subscriptionName) + { + if (durable || subscriptionName != null) + { + throw new NotImplementedException(); // TODO: durable subscriptions. + } + + lock (_closingLock) + { + CheckNotClosed(); + + BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, queueName, noLocal, + _messageFactoryRegistry, this, + prefetchHigh, prefetchLow, exclusive); + try + { + RegisterConsumer(consumer); + } + catch (AMQException e) + { + throw new QpidException("Error registering consumer: " + e, e); + } + + return consumer; + } + } + + public void Unsubscribe(String name) + { + throw new NotImplementedException(); // FIXME + } + + private void CheckTransacted() + { + if (!Transacted) + { + throw new InvalidOperationException("Channel is not transacted"); + } + } + + private void CheckNotTransacted() + { + if (Transacted) + { + throw new InvalidOperationException("Channel is transacted"); + } + } + + public void MessageReceived(UnprocessedMessage message) + { + if (_logger.IsDebugEnabled) + { + _logger.Debug("Message received in session with channel id " + _channelId); + } + if ( message.DeliverBody == null ) + { + ReturnBouncedMessage(message); + } else + { + _queue.Enqueue(message); + } + } + + public int DefaultPrefetch + { + get { return DefaultPrefetchHigh; } + } + public int DefaultPrefetchLow + { + get { return _defaultPrefetchLowMark; } + } + public int DefaultPrefetchHigh + { + get { return _defaultPrefetchHighMark; } + } + public bool IsSuspended + { + get { return _suspended; } + } + + public ushort ChannelId + { + get + { + return _channelId; + } + } + + public AMQConnection Connection + { + get + { + return _connection; + } + } + + internal void Start() + { + _dispatcher = new Dispatcher(this); + Thread dispatcherThread = new Thread(new ThreadStart(_dispatcher.RunDispatcher)); + dispatcherThread.IsBackground = true; + dispatcherThread.Start(); + } + + internal void Stop() + { + Suspend(true); + if (_dispatcher != null) + { + _dispatcher.StopDispatcher(); + } + } + + internal void RegisterConsumer(string consumerTag, IMessageConsumer consumer) + { + _consumers[consumerTag] = consumer; + } + + /// <summary> + /// Called by the MessageConsumer when closing, to deregister the consumer from the + /// map from consumerTag to consumer instance. + /// </summary> + /// <param name="consumerTag">the consumer tag, that was broker-generated</param> + internal void DeregisterConsumer(string consumerTag) + { + _consumers.Remove(consumerTag); + } + + internal void RegisterProducer(long producerId, IMessagePublisher publisher) + { + _producers[producerId] = publisher; + } + + internal void DeregisterProducer(long producerId) + { + _producers.Remove(producerId); + } + + private long GetNextProducerId() + { + return ++_nextProducerId; + } + + public void Dispose() + { + Close(); + } + + /** + * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after + * failover when the client has veoted resubscription. + * + * The caller of this method must already hold the failover mutex. + */ + internal void MarkClosed() + { + SetClosed(); + _connection.DeregisterSession(_channelId); + MarkClosedProducersAndConsumers(); + } + + private void MarkClosedProducersAndConsumers() + { + try + { + // no need for a markClosed* method in this case since there is no protocol traffic closing a producer + CloseProducers(); + } + catch (QpidException e) + { + _logger.Error("Error closing session: " + e, e); + } + try + { + MarkClosedConsumers(); + } + catch (QpidException e) + { + _logger.Error("Error closing session: " + e, e); + } + } + + private void MarkClosedConsumers() + { + if (_dispatcher != null) + { + _dispatcher.StopDispatcher(); + } + // we need to clone the list of consumers since the close() method updates the _consumers collection + // which would result in a concurrent modification exception + ArrayList clonedConsumers = new ArrayList(_consumers.Values); + + foreach (BasicMessageConsumer consumer in clonedConsumers) + { + consumer.MarkClosed(); + } + // at this point the _consumers map will be empty + } + + public void PurgeQueue(string queueName, bool noWait) + { + DoPurgeQueue(queueName, noWait); + } + + private void DoPurgeQueue(string queueName, bool noWait) + { + try + { + _logger.DebugFormat("PurgeQueue {0}", queueName); + + AMQFrame purgeQueue = QueuePurgeBody.CreateAMQFrame(_channelId, 0, queueName, noWait); + + if (noWait) + _connection.ProtocolWriter.Write(purgeQueue); + else + _connection.ConvenientProtocolWriter.SyncWrite(purgeQueue, typeof(QueuePurgeOkBody)); + } + catch (AMQException) + { + throw; + } + } + + /** + * Replays frame on fail over. + * + * @throws AMQException + */ + internal void ReplayOnFailOver() + { + _logger.Debug(string.Format("Replaying frames for channel {0}", _channelId)); + foreach (AMQFrame frame in _replayFrames) + { + _logger.Debug(string.Format("Replaying frame=[{0}]", frame)); + _connection.ProtocolWriter.Write(frame); + } + } + + /// <summary> + /// Callers must hold the failover mutex before calling this method. + /// </summary> + /// <param name="consumer"></param> + void RegisterConsumer(BasicMessageConsumer consumer) + { + String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.NoLocal, + consumer.Exclusive, consumer.AcknowledgeMode); + consumer.ConsumerTag = consumerTag; + _consumers.Add(consumerTag, consumer); + } + + public void Bind(string queueName, string exchangeName, string routingKey, IFieldTable args) + { + DoBind(queueName, exchangeName, routingKey, (FieldTable)args); + } + + public void Bind(string queueName, string exchangeName, string routingKey) + { + DoBind(queueName, exchangeName, routingKey, new FieldTable()); + } + + internal void DoBind(string queueName, string exchangeName, string routingKey, FieldTable args) + { + + _logger.Debug(string.Format("QueueBind queueName={0} exchangeName={1} routingKey={2}, arg={3}", + queueName, exchangeName, routingKey, args)); + + AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0, + queueName, exchangeName, + routingKey, true, args); + _replayFrames.Add(queueBind); + + lock (_connection.FailoverMutex) + { + _connection.ProtocolWriter.Write(queueBind); + } + } + + private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode) + { + // Need to generate a consumer tag on the client so we can exploit the nowait flag. + String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++); + + AMQFrame basicConsume = BasicConsumeBody.CreateAMQFrame(_channelId, 0, + queueName, tag, noLocal, + acknowledgeMode == AcknowledgeMode.NoAcknowledge, + exclusive, true, new FieldTable()); + + _replayFrames.Add(basicConsume); + + _connection.ProtocolWriter.Write(basicConsume); + return tag; + } + + public void DeleteExchange(string exchangeName) + { + throw new NotImplementedException(); // FIXME + } + + public void DeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait) + { + DoDeleteQueue(queueName, ifUnused, ifEmpty, noWait); + } + + private void DoDeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait) + { + try + { + _logger.Debug(string.Format("DeleteQueue name={0}", queueName)); + + AMQFrame queueDelete = QueueDeleteBody.CreateAMQFrame(_channelId, 0, + queueName, // queueName + ifUnused, // IfUnUsed + ifEmpty, // IfEmpty + noWait); // NoWait + + _replayFrames.Add(queueDelete); + + if (noWait) + _connection.ProtocolWriter.Write(queueDelete); + else + _connection.ConvenientProtocolWriter.SyncWrite(queueDelete, typeof(QueueDeleteOkBody)); + } + catch (AMQException) + { + throw; + } + } + + public MessageConsumerBuilder CreateConsumerBuilder(string queueName) + { + return new MessageConsumerBuilder(this, queueName); + } + + public MessagePublisherBuilder CreatePublisherBuilder() + { + return new MessagePublisherBuilder(this); + } + + public string GenerateUniqueName() + { + string result = _connection.ProtocolSession.GenerateQueueName(); + return Regex.Replace(result, "[^a-z0-9_]", "_"); + } + + public void DeclareQueue(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete) + { + DoQueueDeclare(queueName, isDurable, isExclusive, isAutoDelete); + } + + private void DoQueueDeclare(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete) + { + _logger.Debug(string.Format("DeclareQueue name={0} durable={1} exclusive={2}, auto-delete={3}", + queueName, isDurable, isExclusive, isAutoDelete)); + + AMQFrame queueDeclare = QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, + false, isDurable, isExclusive, + isAutoDelete, true, null); + + _replayFrames.Add(queueDeclare); + + lock (_connection.FailoverMutex) + { + _connection.ProtocolWriter.Write(queueDeclare); + } + } + + public void DeclareExchange(String exchangeName, String exchangeClass) + { + _logger.Debug(string.Format("DeclareExchange vame={0} exchangeClass={1}", exchangeName, exchangeClass)); + + DeclareExchange(_channelId, 0, exchangeName, exchangeClass, false, false, false, false, true, null); + } + + // AMQP-level method. + private void DeclareExchange(ushort channelId, ushort ticket, string exchangeName, + string exchangeClass, bool passive, bool durable, + bool autoDelete, bool xinternal, bool noWait, FieldTable args) + { + _logger.Debug(String.Format("DeclareExchange channelId={0} exchangeName={1} exchangeClass={2}", + _channelId, exchangeName, exchangeClass)); + + AMQFrame declareExchange = ExchangeDeclareBody.CreateAMQFrame( + channelId, ticket, exchangeName, exchangeClass, passive, durable, autoDelete, xinternal, noWait, args); + + _replayFrames.Add(declareExchange); + + if (noWait) + { + lock (_connection.FailoverMutex) + { + _connection.ProtocolWriter.Write(declareExchange); + } + } + else + { + throw new NotImplementedException("Don't use nowait=false with DeclareExchange"); +// _connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof (ExchangeDeclareOkBody)); + } + } + + /** + * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from + * a BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is + * AUTO_ACK or similar. + * + * @param deliveryTag the tag of the last message to be acknowledged + * @param multiple if true will acknowledge all messages up to and including the one specified by the + * delivery tag + */ + internal void AcknowledgeMessage(ulong deliveryTag, bool multiple) + { + AMQFrame ackFrame = BasicAckBody.CreateAMQFrame(_channelId, deliveryTag, multiple); + if (_logger.IsDebugEnabled) + { + _logger.Debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); + } + // FIXME: lock FailoverMutex here? + _connection.ProtocolWriter.Write(ackFrame); + } + + /// <summary> + /// Handle a message that bounced from the server, creating + /// the corresponding exception and notifying the connection about it + /// </summary> + /// <param name="message">Unprocessed message</param> + private void ReturnBouncedMessage(UnprocessedMessage message) + { + try + { + AbstractQmsMessage bouncedMessage = + _messageFactoryRegistry.CreateMessage( + 0, false, message.ContentHeader, + message.Bodies + ); + + int errorCode = message.BounceBody.ReplyCode; + string reason = message.BounceBody.ReplyText; + _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")"); + AMQException exception; + if ( errorCode == AMQConstant.NO_CONSUMERS.Code ) + { + exception = new AMQNoConsumersException(reason, bouncedMessage); + } else if ( errorCode == AMQConstant.NO_ROUTE.Code ) + { + exception = new AMQNoRouteException(reason, bouncedMessage); + } else + { + exception = new AMQUndeliveredException(errorCode, reason, bouncedMessage); + } + _connection.ExceptionReceived(exception); + } catch ( Exception ex ) + { + _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", ex); + } + + } + + private void OnPrefetchLowMark(int count) + { + if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge ) + { + _logger.Warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + count); + Suspend(false); + } + } + private void OnPrefetchHighMark(int count) + { + if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge ) + { + _logger.Warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + count); + Suspend(true); + } + } + + private void Suspend(bool suspend) + { + lock ( _suspensionLock ) + { + if ( _logger.IsDebugEnabled ) + { + _logger.Debug("Setting channel flow : " + (suspend ? "suspended" : "unsuspended")); + } + + _suspended = suspend; + AMQFrame frame = ChannelFlowBody.CreateAMQFrame(_channelId, !suspend); + + Connection.ConvenientProtocolWriter.SyncWrite(frame, typeof(ChannelFlowOkBody)); + } + } + + } +} |