summaryrefslogtreecommitdiff
path: root/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/dotnet/Qpid.Client/Client/AmqChannel.cs')
-rw-r--r--qpid/dotnet/Qpid.Client/Client/AmqChannel.cs50
1 files changed, 37 insertions, 13 deletions
diff --git a/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs b/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
index 6b1ee204b5..b7c8b1857e 100644
--- a/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
+++ b/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
@@ -276,20 +276,23 @@ namespace Qpid.Client
CheckNotClosed();
CheckTransacted(); // throws IllegalOperationException if not a transacted session
- /*Channel.Commit frame = new Channel.Commit();
- frame.channelId = _channelId;
- frame.confirmTag = 1;*/
+ try
+ {
+ // Acknowledge up to message last delivered (if any) for each consumer.
+ // Need to send ack for messages delivered to consumers so far.
+ foreach (BasicMessageConsumer consumer in _consumers.Values)
+ {
+ // Sends acknowledgement to server.
+ consumer.AcknowledgeLastDelivered();
+ }
- // try
- // {
- // _connection.getProtocolHandler().writeCommandFrameAndWaitForReply(frame, new ChannelReplyListener(_channelId));
- // }
- // catch (AMQException e)
- // {
- // throw new JMSException("Error creating session: " + e);
- // }
- throw new NotImplementedException();
- //_logger.Info("Transaction commited on channel " + _channelId);
+ // Commits outstanding messages sent and outstanding acknowledgements.
+ _connection.ConvenientProtocolWriter.SyncWrite(TxCommitBody.CreateAMQFrame(_channelId), typeof(TxCommitOkBody));
+ }
+ catch (AMQException e)
+ {
+ throw new QpidException("Failed to commit", e);
+ }
}
public void Rollback()
@@ -978,5 +981,26 @@ namespace Qpid.Client
// _connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof (ExchangeDeclareOkBody));
}
}
+
+ /**
+ * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from
+ * a BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is
+ * AUTO_ACK or similar.
+ *
+ * @param deliveryTag the tag of the last message to be acknowledged
+ * @param multiple if true will acknowledge all messages up to and including the one specified by the
+ * delivery tag
+ */
+ public void AcknowledgeMessage(long deliveryTag, bool multiple)
+ {
+ // XXX: cast to ulong evil?
+ AMQFrame ackFrame = BasicAckBody.CreateAMQFrame(_channelId, (ulong)deliveryTag, multiple);
+ if (_logger.IsDebugEnabled)
+ {
+ _logger.Debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
+ }
+ // FIXME: lock FailoverMutex here?
+ _connection.ProtocolWriter.Write(ackFrame);
+ }
}
}