summaryrefslogtreecommitdiff
path: root/qpid/dotnet/Qpid.Client/Client
diff options
context:
space:
mode:
authorSteven Shaw <steshaw@apache.org>2006-11-28 23:37:52 +0000
committerSteven Shaw <steshaw@apache.org>2006-11-28 23:37:52 +0000
commit90218314a8ad1ed4bfb7c6dd1cbb23ee67bd502c (patch)
treed6c85cf5631b18c507e10cc9d1dc0b732185a496 /qpid/dotnet/Qpid.Client/Client
parentd83a0e745a70209785c12a8da1291cdcf6d0c1cd (diff)
downloadqpid-python-90218314a8ad1ed4bfb7c6dd1cbb23ee67bd502c.tar.gz
QPID-135 Ported enough transaction support to run FailoverTxTest. Still has same problem as the Java client in that on fail-over the "transaction" continues but the earlier part of the transaction is forgotten.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@480283 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/dotnet/Qpid.Client/Client')
-rw-r--r--qpid/dotnet/Qpid.Client/Client/AmqChannel.cs50
-rw-r--r--qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs59
-rw-r--r--qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs25
3 files changed, 109 insertions, 25 deletions
diff --git a/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs b/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
index 6b1ee204b5..b7c8b1857e 100644
--- a/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
+++ b/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
@@ -276,20 +276,23 @@ namespace Qpid.Client
CheckNotClosed();
CheckTransacted(); // throws IllegalOperationException if not a transacted session
- /*Channel.Commit frame = new Channel.Commit();
- frame.channelId = _channelId;
- frame.confirmTag = 1;*/
+ try
+ {
+ // Acknowledge up to message last delivered (if any) for each consumer.
+ // Need to send ack for messages delivered to consumers so far.
+ foreach (BasicMessageConsumer consumer in _consumers.Values)
+ {
+ // Sends acknowledgement to server.
+ consumer.AcknowledgeLastDelivered();
+ }
- // try
- // {
- // _connection.getProtocolHandler().writeCommandFrameAndWaitForReply(frame, new ChannelReplyListener(_channelId));
- // }
- // catch (AMQException e)
- // {
- // throw new JMSException("Error creating session: " + e);
- // }
- throw new NotImplementedException();
- //_logger.Info("Transaction commited on channel " + _channelId);
+ // Commits outstanding messages sent and outstanding acknowledgements.
+ _connection.ConvenientProtocolWriter.SyncWrite(TxCommitBody.CreateAMQFrame(_channelId), typeof(TxCommitOkBody));
+ }
+ catch (AMQException e)
+ {
+ throw new QpidException("Failed to commit", e);
+ }
}
public void Rollback()
@@ -978,5 +981,26 @@ namespace Qpid.Client
// _connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof (ExchangeDeclareOkBody));
}
}
+
+ /**
+ * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from
+ * a BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is
+ * AUTO_ACK or similar.
+ *
+ * @param deliveryTag the tag of the last message to be acknowledged
+ * @param multiple if true will acknowledge all messages up to and including the one specified by the
+ * delivery tag
+ */
+ public void AcknowledgeMessage(long deliveryTag, bool multiple)
+ {
+ // XXX: cast to ulong evil?
+ AMQFrame ackFrame = BasicAckBody.CreateAMQFrame(_channelId, (ulong)deliveryTag, multiple);
+ if (_logger.IsDebugEnabled)
+ {
+ _logger.Debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
+ }
+ // FIXME: lock FailoverMutex here?
+ _connection.ProtocolWriter.Write(ackFrame);
+ }
}
}
diff --git a/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs b/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
index d0e0473b14..1c9a009174 100644
--- a/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
+++ b/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
@@ -98,6 +98,11 @@ namespace Qpid.Client
private AmqChannel _channel;
+ /// <summary>
+ /// Tag of last message delievered, whoch should be acknowledged on commit in transaction mode.
+ /// </summary>
+ private long _lastDeliveryTag;
+
public BasicMessageConsumer(ushort channelId, string queueName, bool noLocal,
MessageFactoryRegistry messageFactory, AmqChannel channel)
{
@@ -167,7 +172,13 @@ namespace Qpid.Client
{
o = _synchronousQueue.DequeueBlocking();
}
- return ReturnMessageOrThrow(o);
+
+ IMessage m = ReturnMessageOrThrow(o);
+ if (m != null)
+ {
+ PostDeliver(m);
+ }
+ return m;
}
finally
{
@@ -222,7 +233,7 @@ namespace Qpid.Client
/// <returns> a message only if o is a Message</returns>
/// <exception>JMSException if the argument is a throwable. If it is a QpidMessagingException it is rethrown as is, but if not
/// a QpidMessagingException is created with the linked exception set appropriately</exception>
- private IMessage ReturnMessageOrThrow(object o)
+ private IMessage ReturnMessageOrThrow(object o)
{
// errors are passed via the queue too since there is no way of interrupting the poll() via the API.
if (o is Exception)
@@ -397,5 +408,49 @@ namespace Qpid.Client
{
get { return _queueName; }
}
+
+ /// <summary>
+ /// Acknowledge up to last message delivered (if any). Used when commiting.
+ /// </summary>
+ internal void AcknowledgeLastDelivered()
+ {
+ if (_lastDeliveryTag > 0)
+ {
+ _channel.AcknowledgeMessage(_lastDeliveryTag, true);
+ _lastDeliveryTag = -1;
+ }
+ }
+
+ private void PostDeliver(IMessage m)
+ {
+ AbstractQmsMessage msg = (AbstractQmsMessage) m;
+ switch (_acknowledgeMode)
+ {
+/* TODO
+ case AcknowledgeMode.DupsOkAcknowledge:
+ if (++_outstanding >= _prefetchHigh)
+ {
+ _dups_ok_acknowledge_send = true;
+ }
+ if (_outstanding <= _prefetchLow)
+ {
+ _dups_ok_acknowledge_send = false;
+ }
+
+ if (_dups_ok_acknowledge_send)
+ {
+ _channel.AcknowledgeMessage(msg.getDeliveryTag(), true);
+ }
+ break;
+ */
+ case AcknowledgeMode.AutoAcknowledge:
+ _channel.AcknowledgeMessage(msg.DeliveryTag, false);
+ break;
+ case AcknowledgeMode.SessionTransacted:
+ _lastDeliveryTag = msg.DeliveryTag;
+ break;
+ }
+ }
+
}
}
diff --git a/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs b/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs
index eb34fa45db..81499e31fe 100644
--- a/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs
+++ b/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs
@@ -31,22 +31,27 @@ namespace Qpid.Client.Message
/// </summary>
protected AmqChannel _channel;
- public AMQMessage(IContentHeaderProperties properties)
+ private long _deliveryTag;
+
+ public AMQMessage(IContentHeaderProperties properties, long deliveryTag)
{
_contentHeaderProperties = properties;
+ _deliveryTag = deliveryTag;
}
- public AmqChannel Channel
+ public AMQMessage(IContentHeaderProperties properties) : this(properties, -1)
+ {
+ }
+
+ public long DeliveryTag
{
- get
- {
- return _channel;
- }
+ get { return _deliveryTag; }
+ }
- set
- {
- _channel = value;
- }
+ public AmqChannel Channel
+ {
+ get { return _channel; }
+ set { _channel = value; }
}
}
}