diff options
author | Steven Shaw <steshaw@apache.org> | 2006-11-28 23:37:52 +0000 |
---|---|---|
committer | Steven Shaw <steshaw@apache.org> | 2006-11-28 23:37:52 +0000 |
commit | 90218314a8ad1ed4bfb7c6dd1cbb23ee67bd502c (patch) | |
tree | d6c85cf5631b18c507e10cc9d1dc0b732185a496 /qpid/dotnet/Qpid.Client/Client | |
parent | d83a0e745a70209785c12a8da1291cdcf6d0c1cd (diff) | |
download | qpid-python-90218314a8ad1ed4bfb7c6dd1cbb23ee67bd502c.tar.gz |
QPID-135 Ported enough transaction support to run FailoverTxTest. Still has same problem as the Java client in that on fail-over the "transaction" continues but the earlier part of the transaction is forgotten.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@480283 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/dotnet/Qpid.Client/Client')
-rw-r--r-- | qpid/dotnet/Qpid.Client/Client/AmqChannel.cs | 50 | ||||
-rw-r--r-- | qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs | 59 | ||||
-rw-r--r-- | qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs | 25 |
3 files changed, 109 insertions, 25 deletions
diff --git a/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs b/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs index 6b1ee204b5..b7c8b1857e 100644 --- a/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs +++ b/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs @@ -276,20 +276,23 @@ namespace Qpid.Client CheckNotClosed(); CheckTransacted(); // throws IllegalOperationException if not a transacted session - /*Channel.Commit frame = new Channel.Commit(); - frame.channelId = _channelId; - frame.confirmTag = 1;*/ + try + { + // Acknowledge up to message last delivered (if any) for each consumer. + // Need to send ack for messages delivered to consumers so far. + foreach (BasicMessageConsumer consumer in _consumers.Values) + { + // Sends acknowledgement to server. + consumer.AcknowledgeLastDelivered(); + } - // try - // { - // _connection.getProtocolHandler().writeCommandFrameAndWaitForReply(frame, new ChannelReplyListener(_channelId)); - // } - // catch (AMQException e) - // { - // throw new JMSException("Error creating session: " + e); - // } - throw new NotImplementedException(); - //_logger.Info("Transaction commited on channel " + _channelId); + // Commits outstanding messages sent and outstanding acknowledgements. + _connection.ConvenientProtocolWriter.SyncWrite(TxCommitBody.CreateAMQFrame(_channelId), typeof(TxCommitOkBody)); + } + catch (AMQException e) + { + throw new QpidException("Failed to commit", e); + } } public void Rollback() @@ -978,5 +981,26 @@ namespace Qpid.Client // _connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof (ExchangeDeclareOkBody)); } } + + /** + * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from + * a BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is + * AUTO_ACK or similar. + * + * @param deliveryTag the tag of the last message to be acknowledged + * @param multiple if true will acknowledge all messages up to and including the one specified by the + * delivery tag + */ + public void AcknowledgeMessage(long deliveryTag, bool multiple) + { + // XXX: cast to ulong evil? + AMQFrame ackFrame = BasicAckBody.CreateAMQFrame(_channelId, (ulong)deliveryTag, multiple); + if (_logger.IsDebugEnabled) + { + _logger.Debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); + } + // FIXME: lock FailoverMutex here? + _connection.ProtocolWriter.Write(ackFrame); + } } } diff --git a/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs b/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs index d0e0473b14..1c9a009174 100644 --- a/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs +++ b/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs @@ -98,6 +98,11 @@ namespace Qpid.Client private AmqChannel _channel; + /// <summary> + /// Tag of last message delievered, whoch should be acknowledged on commit in transaction mode. + /// </summary> + private long _lastDeliveryTag; + public BasicMessageConsumer(ushort channelId, string queueName, bool noLocal, MessageFactoryRegistry messageFactory, AmqChannel channel) { @@ -167,7 +172,13 @@ namespace Qpid.Client { o = _synchronousQueue.DequeueBlocking(); } - return ReturnMessageOrThrow(o); + + IMessage m = ReturnMessageOrThrow(o); + if (m != null) + { + PostDeliver(m); + } + return m; } finally { @@ -222,7 +233,7 @@ namespace Qpid.Client /// <returns> a message only if o is a Message</returns> /// <exception>JMSException if the argument is a throwable. If it is a QpidMessagingException it is rethrown as is, but if not /// a QpidMessagingException is created with the linked exception set appropriately</exception> - private IMessage ReturnMessageOrThrow(object o) + private IMessage ReturnMessageOrThrow(object o) { // errors are passed via the queue too since there is no way of interrupting the poll() via the API. if (o is Exception) @@ -397,5 +408,49 @@ namespace Qpid.Client { get { return _queueName; } } + + /// <summary> + /// Acknowledge up to last message delivered (if any). Used when commiting. + /// </summary> + internal void AcknowledgeLastDelivered() + { + if (_lastDeliveryTag > 0) + { + _channel.AcknowledgeMessage(_lastDeliveryTag, true); + _lastDeliveryTag = -1; + } + } + + private void PostDeliver(IMessage m) + { + AbstractQmsMessage msg = (AbstractQmsMessage) m; + switch (_acknowledgeMode) + { +/* TODO + case AcknowledgeMode.DupsOkAcknowledge: + if (++_outstanding >= _prefetchHigh) + { + _dups_ok_acknowledge_send = true; + } + if (_outstanding <= _prefetchLow) + { + _dups_ok_acknowledge_send = false; + } + + if (_dups_ok_acknowledge_send) + { + _channel.AcknowledgeMessage(msg.getDeliveryTag(), true); + } + break; + */ + case AcknowledgeMode.AutoAcknowledge: + _channel.AcknowledgeMessage(msg.DeliveryTag, false); + break; + case AcknowledgeMode.SessionTransacted: + _lastDeliveryTag = msg.DeliveryTag; + break; + } + } + } } diff --git a/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs b/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs index eb34fa45db..81499e31fe 100644 --- a/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs +++ b/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs @@ -31,22 +31,27 @@ namespace Qpid.Client.Message /// </summary> protected AmqChannel _channel; - public AMQMessage(IContentHeaderProperties properties) + private long _deliveryTag; + + public AMQMessage(IContentHeaderProperties properties, long deliveryTag) { _contentHeaderProperties = properties; + _deliveryTag = deliveryTag; } - public AmqChannel Channel + public AMQMessage(IContentHeaderProperties properties) : this(properties, -1) + { + } + + public long DeliveryTag { - get - { - return _channel; - } + get { return _deliveryTag; } + } - set - { - _channel = value; - } + public AmqChannel Channel + { + get { return _channel; } + set { _channel = value; } } } } |