diff options
Diffstat (limited to 'qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs')
-rw-r--r-- | qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs | 59 |
1 files changed, 57 insertions, 2 deletions
diff --git a/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs b/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs index d0e0473b14..1c9a009174 100644 --- a/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs +++ b/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs @@ -98,6 +98,11 @@ namespace Qpid.Client private AmqChannel _channel; + /// <summary> + /// Tag of last message delievered, whoch should be acknowledged on commit in transaction mode. + /// </summary> + private long _lastDeliveryTag; + public BasicMessageConsumer(ushort channelId, string queueName, bool noLocal, MessageFactoryRegistry messageFactory, AmqChannel channel) { @@ -167,7 +172,13 @@ namespace Qpid.Client { o = _synchronousQueue.DequeueBlocking(); } - return ReturnMessageOrThrow(o); + + IMessage m = ReturnMessageOrThrow(o); + if (m != null) + { + PostDeliver(m); + } + return m; } finally { @@ -222,7 +233,7 @@ namespace Qpid.Client /// <returns> a message only if o is a Message</returns> /// <exception>JMSException if the argument is a throwable. If it is a QpidMessagingException it is rethrown as is, but if not /// a QpidMessagingException is created with the linked exception set appropriately</exception> - private IMessage ReturnMessageOrThrow(object o) + private IMessage ReturnMessageOrThrow(object o) { // errors are passed via the queue too since there is no way of interrupting the poll() via the API. if (o is Exception) @@ -397,5 +408,49 @@ namespace Qpid.Client { get { return _queueName; } } + + /// <summary> + /// Acknowledge up to last message delivered (if any). Used when commiting. + /// </summary> + internal void AcknowledgeLastDelivered() + { + if (_lastDeliveryTag > 0) + { + _channel.AcknowledgeMessage(_lastDeliveryTag, true); + _lastDeliveryTag = -1; + } + } + + private void PostDeliver(IMessage m) + { + AbstractQmsMessage msg = (AbstractQmsMessage) m; + switch (_acknowledgeMode) + { +/* TODO + case AcknowledgeMode.DupsOkAcknowledge: + if (++_outstanding >= _prefetchHigh) + { + _dups_ok_acknowledge_send = true; + } + if (_outstanding <= _prefetchLow) + { + _dups_ok_acknowledge_send = false; + } + + if (_dups_ok_acknowledge_send) + { + _channel.AcknowledgeMessage(msg.getDeliveryTag(), true); + } + break; + */ + case AcknowledgeMode.AutoAcknowledge: + _channel.AcknowledgeMessage(msg.DeliveryTag, false); + break; + case AcknowledgeMode.SessionTransacted: + _lastDeliveryTag = msg.DeliveryTag; + break; + } + } + } } |