summaryrefslogtreecommitdiff
path: root/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
diff options
context:
space:
mode:
Diffstat (limited to 'dotnet/Qpid.Client/Client/BasicMessageConsumer.cs')
-rw-r--r--dotnet/Qpid.Client/Client/BasicMessageConsumer.cs59
1 files changed, 57 insertions, 2 deletions
diff --git a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
index d0e0473b14..1c9a009174 100644
--- a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
+++ b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
@@ -98,6 +98,11 @@ namespace Qpid.Client
private AmqChannel _channel;
+ /// <summary>
+ /// Tag of last message delievered, whoch should be acknowledged on commit in transaction mode.
+ /// </summary>
+ private long _lastDeliveryTag;
+
public BasicMessageConsumer(ushort channelId, string queueName, bool noLocal,
MessageFactoryRegistry messageFactory, AmqChannel channel)
{
@@ -167,7 +172,13 @@ namespace Qpid.Client
{
o = _synchronousQueue.DequeueBlocking();
}
- return ReturnMessageOrThrow(o);
+
+ IMessage m = ReturnMessageOrThrow(o);
+ if (m != null)
+ {
+ PostDeliver(m);
+ }
+ return m;
}
finally
{
@@ -222,7 +233,7 @@ namespace Qpid.Client
/// <returns> a message only if o is a Message</returns>
/// <exception>JMSException if the argument is a throwable. If it is a QpidMessagingException it is rethrown as is, but if not
/// a QpidMessagingException is created with the linked exception set appropriately</exception>
- private IMessage ReturnMessageOrThrow(object o)
+ private IMessage ReturnMessageOrThrow(object o)
{
// errors are passed via the queue too since there is no way of interrupting the poll() via the API.
if (o is Exception)
@@ -397,5 +408,49 @@ namespace Qpid.Client
{
get { return _queueName; }
}
+
+ /// <summary>
+ /// Acknowledge up to last message delivered (if any). Used when commiting.
+ /// </summary>
+ internal void AcknowledgeLastDelivered()
+ {
+ if (_lastDeliveryTag > 0)
+ {
+ _channel.AcknowledgeMessage(_lastDeliveryTag, true);
+ _lastDeliveryTag = -1;
+ }
+ }
+
+ private void PostDeliver(IMessage m)
+ {
+ AbstractQmsMessage msg = (AbstractQmsMessage) m;
+ switch (_acknowledgeMode)
+ {
+/* TODO
+ case AcknowledgeMode.DupsOkAcknowledge:
+ if (++_outstanding >= _prefetchHigh)
+ {
+ _dups_ok_acknowledge_send = true;
+ }
+ if (_outstanding <= _prefetchLow)
+ {
+ _dups_ok_acknowledge_send = false;
+ }
+
+ if (_dups_ok_acknowledge_send)
+ {
+ _channel.AcknowledgeMessage(msg.getDeliveryTag(), true);
+ }
+ break;
+ */
+ case AcknowledgeMode.AutoAcknowledge:
+ _channel.AcknowledgeMessage(msg.DeliveryTag, false);
+ break;
+ case AcknowledgeMode.SessionTransacted:
+ _lastDeliveryTag = msg.DeliveryTag;
+ break;
+ }
+ }
+
}
}