diff options
Diffstat (limited to 'dotnet/Qpid.Client/Client/BasicMessageConsumer.cs')
-rw-r--r-- | dotnet/Qpid.Client/Client/BasicMessageConsumer.cs | 103 |
1 files changed, 29 insertions, 74 deletions
diff --git a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs index 611a4e5351..133707c609 100644 --- a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs +++ b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs @@ -82,10 +82,15 @@ namespace Qpid.Client /// </summary> private readonly object _syncLock = new object(); - /** - * We store the prefetch field in order to be able to reuse it when resubscribing in the event of failover - */ - private int _prefetch; + /// <summary> + /// We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of failover + /// </summary> + private int _prefetchHigh; + + /// <summary> + /// We store the low water prefetch field in order to be able to reuse it when resubscribing in the event of failover + /// </summary> + private int _prefetchLow; /// <summary> /// When true indicates that either a message listener is set or that @@ -108,8 +113,20 @@ namespace Qpid.Client /// </summary> private long _lastDeliveryTag; - public BasicMessageConsumer(ushort channelId, string queueName, bool noLocal, - MessageFactoryRegistry messageFactory, AmqChannel channel) + /// <summary> + /// Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode + /// </summary> + private int _outstanding; + + /// <summary> + /// Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. + /// Enabled when _outstannding number of msgs >= _prefetchHigh and disabled at < _prefetchLow + /// </summary> + private bool _dups_ok_acknowledge_send; + + internal BasicMessageConsumer(ushort channelId, string queueName, bool noLocal, + MessageFactoryRegistry messageFactory, AmqChannel channel, + int prefetchHigh, int prefetchLow, bool exclusive) { _channelId = channelId; _queueName = queueName; @@ -117,6 +134,9 @@ namespace Qpid.Client _messageFactory = messageFactory; _channel = channel; _acknowledgeMode = _channel.AcknowledgeMode; + _prefetchHigh = prefetchHigh; + _prefetchLow = prefetchLow; + _exclusive = exclusive; } #region IMessageConsumer Members @@ -302,65 +322,6 @@ namespace Qpid.Client } } -// /// <summary> -// /// Called from the AmqChannel when a message has arrived for this consumer. This methods handles both the case -// /// of a message listener or a synchronous receive() caller. -// /// </summary> -// /// <param name="messageFrame">the raw unprocessed mesage</param> -// /// <param name="acknowledgeMode">the acknowledge mode requested for this message</param> -// /// <param name="channelId">channel on which this message was sent</param> -// internal void NotifyMessage(UnprocessedMessage messageFrame, AcknowledgeMode acknowledgeMode, ushort channelId) -// { -// _logger.Info("XXX notifyMessage called with message number " + messageFrame.DeliverBody.DeliveryTag); -// if (_logger.IsDebugEnabled) -// { -// _logger.Debug("notifyMessage called with message number " + messageFrame.DeliverBody.DeliveryTag); -// } -// try -// { -// AbstractQmsMessage jmsMessage = _messageFactory.CreateMessage((long)messageFrame.DeliverBody.DeliveryTag, -// messageFrame.DeliverBody.Redelivered, -// messageFrame.ContentHeader, -// messageFrame.Bodies); - -// /*if (acknowledgeMode == AcknowledgeMode.PreAcknowledge) -// { -// _channel.sendAcknowledgement(messageFrame.deliverBody.deliveryTag); -// }*/ -// if (acknowledgeMode == AcknowledgeMode.ClientAcknowledge) -// { -// // we set the session so that when the user calls acknowledge() it can call the method on session -// // to send out the appropriate frame -// jmsMessage.Channel = _channel; -// } - -// lock (_syncLock) -// { -// if (_messageListener != null) -// { -//#if __MonoCS__ -// _messageListener(jmsMessage); -//#else -// _messageListener.Invoke(jmsMessage); -//#endif -// } -// else -// { -// _synchronousQueue.Enqueue(jmsMessage); -// } -// } -// if (acknowledgeMode == AcknowledgeMode.AutoAcknowledge) -// { -// _channel.SendAcknowledgement(messageFrame.DeliverBody.DeliveryTag); -// } -// } -// catch (Exception e) -// { -// _logger.Error("Caught exception (dump follows) - ignoring...", e); -// } -// } - - /** * Called from the AMQSession when a message has arrived for this consumer. This methods handles both the case * of a message listener or a synchronous receive() caller. @@ -465,11 +426,6 @@ namespace Qpid.Client DeregisterConsumer(); } - public int Prefetch - { - get { return _prefetch; } - } - public string QueueName { get { return _queueName; } @@ -509,7 +465,6 @@ namespace Qpid.Client AbstractQmsMessage msg = (AbstractQmsMessage) m; switch (AcknowledgeMode) { -/* TODO case AcknowledgeMode.DupsOkAcknowledge: if (++_outstanding >= _prefetchHigh) { @@ -519,16 +474,16 @@ namespace Qpid.Client { _dups_ok_acknowledge_send = false; } - if (_dups_ok_acknowledge_send) { - _channel.AcknowledgeMessage(msg.getDeliveryTag(), true); + _channel.AcknowledgeMessage((ulong)msg.DeliveryTag, true); } break; - */ + case AcknowledgeMode.AutoAcknowledge: _channel.AcknowledgeMessage((ulong)msg.DeliveryTag, true); break; + case AcknowledgeMode.SessionTransacted: _lastDeliveryTag = msg.DeliveryTag; break; |