diff options
Diffstat (limited to 'qpid/dotnet/Qpid.Client/Client/AmqChannel.cs')
-rw-r--r-- | qpid/dotnet/Qpid.Client/Client/AmqChannel.cs | 50 |
1 files changed, 37 insertions, 13 deletions
diff --git a/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs b/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs index 6b1ee204b5..b7c8b1857e 100644 --- a/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs +++ b/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs @@ -276,20 +276,23 @@ namespace Qpid.Client CheckNotClosed(); CheckTransacted(); // throws IllegalOperationException if not a transacted session - /*Channel.Commit frame = new Channel.Commit(); - frame.channelId = _channelId; - frame.confirmTag = 1;*/ + try + { + // Acknowledge up to message last delivered (if any) for each consumer. + // Need to send ack for messages delivered to consumers so far. + foreach (BasicMessageConsumer consumer in _consumers.Values) + { + // Sends acknowledgement to server. + consumer.AcknowledgeLastDelivered(); + } - // try - // { - // _connection.getProtocolHandler().writeCommandFrameAndWaitForReply(frame, new ChannelReplyListener(_channelId)); - // } - // catch (AMQException e) - // { - // throw new JMSException("Error creating session: " + e); - // } - throw new NotImplementedException(); - //_logger.Info("Transaction commited on channel " + _channelId); + // Commits outstanding messages sent and outstanding acknowledgements. + _connection.ConvenientProtocolWriter.SyncWrite(TxCommitBody.CreateAMQFrame(_channelId), typeof(TxCommitOkBody)); + } + catch (AMQException e) + { + throw new QpidException("Failed to commit", e); + } } public void Rollback() @@ -978,5 +981,26 @@ namespace Qpid.Client // _connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof (ExchangeDeclareOkBody)); } } + + /** + * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from + * a BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is + * AUTO_ACK or similar. + * + * @param deliveryTag the tag of the last message to be acknowledged + * @param multiple if true will acknowledge all messages up to and including the one specified by the + * delivery tag + */ + public void AcknowledgeMessage(long deliveryTag, bool multiple) + { + // XXX: cast to ulong evil? + AMQFrame ackFrame = BasicAckBody.CreateAMQFrame(_channelId, (ulong)deliveryTag, multiple); + if (_logger.IsDebugEnabled) + { + _logger.Debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); + } + // FIXME: lock FailoverMutex here? + _connection.ProtocolWriter.Write(ackFrame); + } } } |