diff options
author | Steven Shaw <steshaw@apache.org> | 2006-11-28 23:37:52 +0000 |
---|---|---|
committer | Steven Shaw <steshaw@apache.org> | 2006-11-28 23:37:52 +0000 |
commit | 90218314a8ad1ed4bfb7c6dd1cbb23ee67bd502c (patch) | |
tree | d6c85cf5631b18c507e10cc9d1dc0b732185a496 /qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs | |
parent | d83a0e745a70209785c12a8da1291cdcf6d0c1cd (diff) | |
download | qpid-python-90218314a8ad1ed4bfb7c6dd1cbb23ee67bd502c.tar.gz |
QPID-135 Ported enough transaction support to run FailoverTxTest. Still has same problem as the Java client in that on fail-over the "transaction" continues but the earlier part of the transaction is forgotten.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@480283 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs')
-rw-r--r-- | qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs | 59 |
1 files changed, 57 insertions, 2 deletions
diff --git a/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs b/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs index d0e0473b14..1c9a009174 100644 --- a/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs +++ b/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs @@ -98,6 +98,11 @@ namespace Qpid.Client private AmqChannel _channel; + /// <summary> + /// Tag of last message delievered, whoch should be acknowledged on commit in transaction mode. + /// </summary> + private long _lastDeliveryTag; + public BasicMessageConsumer(ushort channelId, string queueName, bool noLocal, MessageFactoryRegistry messageFactory, AmqChannel channel) { @@ -167,7 +172,13 @@ namespace Qpid.Client { o = _synchronousQueue.DequeueBlocking(); } - return ReturnMessageOrThrow(o); + + IMessage m = ReturnMessageOrThrow(o); + if (m != null) + { + PostDeliver(m); + } + return m; } finally { @@ -222,7 +233,7 @@ namespace Qpid.Client /// <returns> a message only if o is a Message</returns> /// <exception>JMSException if the argument is a throwable. If it is a QpidMessagingException it is rethrown as is, but if not /// a QpidMessagingException is created with the linked exception set appropriately</exception> - private IMessage ReturnMessageOrThrow(object o) + private IMessage ReturnMessageOrThrow(object o) { // errors are passed via the queue too since there is no way of interrupting the poll() via the API. if (o is Exception) @@ -397,5 +408,49 @@ namespace Qpid.Client { get { return _queueName; } } + + /// <summary> + /// Acknowledge up to last message delivered (if any). Used when commiting. + /// </summary> + internal void AcknowledgeLastDelivered() + { + if (_lastDeliveryTag > 0) + { + _channel.AcknowledgeMessage(_lastDeliveryTag, true); + _lastDeliveryTag = -1; + } + } + + private void PostDeliver(IMessage m) + { + AbstractQmsMessage msg = (AbstractQmsMessage) m; + switch (_acknowledgeMode) + { +/* TODO + case AcknowledgeMode.DupsOkAcknowledge: + if (++_outstanding >= _prefetchHigh) + { + _dups_ok_acknowledge_send = true; + } + if (_outstanding <= _prefetchLow) + { + _dups_ok_acknowledge_send = false; + } + + if (_dups_ok_acknowledge_send) + { + _channel.AcknowledgeMessage(msg.getDeliveryTag(), true); + } + break; + */ + case AcknowledgeMode.AutoAcknowledge: + _channel.AcknowledgeMessage(msg.DeliveryTag, false); + break; + case AcknowledgeMode.SessionTransacted: + _lastDeliveryTag = msg.DeliveryTag; + break; + } + } + } } |