diff options
16 files changed, 121 insertions, 149 deletions
diff --git a/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersMatchingConsumer.cs b/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersMatchingConsumer.cs index cf26b42c6a..1b27f920b8 100644 --- a/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersMatchingConsumer.cs +++ b/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersMatchingConsumer.cs @@ -58,8 +58,9 @@ namespace Qpid.Client.Tests _channel.Bind(queueName, _serviceName, null, CreatePatternAsFieldTable()); IMessageConsumer consumer = _channel.CreateConsumerBuilder(queueName) - .withPrefetch(100) - .withNoLocal(true) + .WithPrefetchLow(100) + .WithPrefetchHigh(500) + .WithNoLocal(true) .Create(); consumer.OnMessage = new MessageReceivedDelegate(OnMessage); diff --git a/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersMatchingProducer.cs b/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersMatchingProducer.cs index 35f6017f48..c748ef8840 100644 --- a/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersMatchingProducer.cs +++ b/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersMatchingProducer.cs @@ -43,8 +43,8 @@ namespace Qpid.Client.Tests try { _publisher = _channel.CreatePublisherBuilder() - .withExchangeName(_commandExchangeName) - .withMandatory(true) + .WithExchangeName(_commandExchangeName) + .WithMandatory(true) .Create(); // Disabling timestamps - a performance optimisation where timestamps and TTL/expiration diff --git a/dotnet/Qpid.Client.Tests/MultiConsumer/ProducerMultiConsumer.cs b/dotnet/Qpid.Client.Tests/MultiConsumer/ProducerMultiConsumer.cs index 6b6fca20b2..687f08eeef 100644 --- a/dotnet/Qpid.Client.Tests/MultiConsumer/ProducerMultiConsumer.cs +++ b/dotnet/Qpid.Client.Tests/MultiConsumer/ProducerMultiConsumer.cs @@ -70,8 +70,8 @@ namespace Qpid.Client.Tests { base.Init(); _publisher = _channel.CreatePublisherBuilder() - .withRoutingKey(_commandQueueName) - .withExchangeName(ExchangeNameDefaults.TOPIC) + .WithRoutingKey(_commandQueueName) + .WithExchangeName(ExchangeNameDefaults.TOPIC) .Create(); _publisher.DisableMessageTimestamp = true; @@ -85,7 +85,7 @@ namespace Qpid.Client.Tests _channel.Bind(queueName, ExchangeNameDefaults.TOPIC, _commandQueueName); _consumers[i] = _channel.CreateConsumerBuilder(queueName) - .withPrefetch(100).Create(); + .WithPrefetchLow(100).Create(); _consumers[i].OnMessage = new MessageReceivedDelegate(OnMessage); } _connection.Start(); diff --git a/dotnet/Qpid.Client.Tests/failover/FailoverTest.cs b/dotnet/Qpid.Client.Tests/failover/FailoverTest.cs index 952cc2a58e..52ef76c559 100644 --- a/dotnet/Qpid.Client.Tests/failover/FailoverTest.cs +++ b/dotnet/Qpid.Client.Tests/failover/FailoverTest.cs @@ -78,8 +78,8 @@ namespace Qpid.Client.Tests.failover // _publisher = _channel.CreatePublisher(exchangeName, exchangeClass, routingKey); _publisher = _channel.CreatePublisherBuilder() - .withRoutingKey(routingKey) - .withExchangeName(exchangeName) + .WithRoutingKey(routingKey) + .WithExchangeName(exchangeName) .Create(); _publisher.Send(msg); } @@ -206,8 +206,8 @@ namespace Qpid.Client.Tests.failover //return _channel.CreatePublisher(exchangeName, exchangeClass, routingKey); return _session.CreatePublisherBuilder() - .withExchangeName(exchangeName) - .withRoutingKey(routingKey) + .WithExchangeName(exchangeName) + .WithRoutingKey(routingKey) .Create(); } } diff --git a/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs b/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs index 79a04e79eb..4e95c12290 100644 --- a/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs +++ b/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs @@ -36,8 +36,11 @@ namespace Qpid.Client.Tests.failover const int NUM_ITERATIONS = 10; const int NUM_COMMITED_MESSAGES = 10; const int NUM_ROLLEDBACK_MESSAGES = 3; - const int SLEEP_MILLIS = 500; + const int SLEEP_MILLIS = 50; + // AutoAcknowledge, ClientAcknowledge, DupsOkAcknowledge, NoAcknowledge, PreAcknowledge + AcknowledgeMode _acknowledgeMode = AcknowledgeMode.DupsOkAcknowledge; + const bool _noWait = true; // use Receive or ReceiveNoWait AMQConnection _connection; public void OnMessage(IMessage message) @@ -45,6 +48,11 @@ namespace Qpid.Client.Tests.failover try { _log.Info("Received: " + ((ITextMessage) message).Text); + if (_acknowledgeMode == AcknowledgeMode.ClientAcknowledge) + { + _log.Info("client acknowledging"); + message.Acknowledge(); + } } catch (QpidException e) { @@ -56,11 +64,13 @@ namespace Qpid.Client.Tests.failover { FailoverTxTest _failoverTxTest; IMessageConsumer _consumer; + private bool _noWait; - internal NoWaitConsumer(FailoverTxTest failoverTxTest, IMessageConsumer channel) + internal NoWaitConsumer(FailoverTxTest failoverTxTest, IMessageConsumer channel, bool noWait) { _failoverTxTest = failoverTxTest; _consumer = channel; + _noWait = noWait; } internal void Run() @@ -68,7 +78,9 @@ namespace Qpid.Client.Tests.failover int messages = 0; while (messages < NUM_COMMITED_MESSAGES) { - IMessage msg = _consumer.ReceiveNoWait(); + IMessage msg; + if (_noWait) msg = _consumer.ReceiveNoWait(); + else msg = _consumer.Receive(); if (msg != null) { _log.Info("NoWait received message"); @@ -93,7 +105,8 @@ namespace Qpid.Client.Tests.failover _log.Info("connectionInfo = " + connectionInfo); _log.Info("connection.asUrl = " + _connection.toURL()); - IChannel receivingChannel = _connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge); + _log.Info("AcknowledgeMode is " + _acknowledgeMode); + IChannel receivingChannel = _connection.CreateChannel(false, _acknowledgeMode); string queueName = receivingChannel.GenerateUniqueName(); @@ -103,17 +116,17 @@ namespace Qpid.Client.Tests.failover // No need to call Queue.Bind as automatically bound to default direct exchange. receivingChannel.Bind(queueName, "amq.direct", queueName); - - IMessageConsumer consumer = receivingChannel.CreateConsumerBuilder(queueName).Create(); - bool useThread = true; + IMessageConsumer consumer = receivingChannel.CreateConsumerBuilder(queueName) + .WithPrefetchLow(30) + .WithPrefetchHigh(60).Create(); + bool useThread = false; if (useThread) { - NoWaitConsumer noWaitConsumer = new NoWaitConsumer(this, consumer); + NoWaitConsumer noWaitConsumer = new NoWaitConsumer(this, consumer, _noWait); new Thread(noWaitConsumer.Run).Start(); } else { - //receivingChannel.CreateConsumerBuilder(queueName).Create().OnMessage = new MessageReceivedDelegate(onMessage); consumer.OnMessage = new MessageReceivedDelegate(OnMessage); } @@ -133,7 +146,7 @@ namespace Qpid.Client.Tests.failover bool transacted = true; IChannel publishingChannel = _connection.CreateChannel(transacted, AcknowledgeMode.NoAcknowledge); IMessagePublisher publisher = publishingChannel.CreatePublisherBuilder() - .withRoutingKey(routingKey) + .WithRoutingKey(routingKey) .Create(); for (int i = 1; i <= NUM_ITERATIONS; ++i) diff --git a/dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs b/dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs index bb5758c18c..a52b4e2c50 100644 --- a/dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs +++ b/dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs @@ -60,8 +60,9 @@ namespace Qpid.Client.Tests _channel.DeclareQueue(_serviceName, false, false, false); IMessageConsumer consumer = _channel.CreateConsumerBuilder(_serviceName) - .withPrefetch(100) - .withNoLocal(true) + .WithPrefetchLow(100) + .WithPrefetchHigh(500) + .WithNoLocal(true) .Create(); consumer.OnMessage = new MessageReceivedDelegate(OnMessage); } @@ -100,8 +101,8 @@ namespace Qpid.Client.Tests // Console.WriteLine("ReplyTo.RoutingKey = " + _replyToRoutingKey); _destinationPublisher = _channel.CreatePublisherBuilder() - .withExchangeName(_replyToExchangeName) - .withRoutingKey(_replyToRoutingKey) + .WithExchangeName(_replyToExchangeName) + .WithRoutingKey(_replyToRoutingKey) .Create(); _destinationPublisher.DisableMessageTimestamp = true; _destinationPublisher.DeliveryMode = DeliveryMode.NonPersistent; diff --git a/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs b/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs index 582f022719..e437b670bf 100644 --- a/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs +++ b/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs @@ -53,7 +53,7 @@ namespace Qpid.Client.Tests try { _publisher = _channel.CreatePublisherBuilder() - .withRoutingKey(_commandQueueName) + .WithRoutingKey(_commandQueueName) .Create(); _publisher.DisableMessageTimestamp = true; // XXX: need a "with" for this in builder? _publisher.DeliveryMode = DeliveryMode.NonPersistent; // XXX: need a "with" for this in builder? @@ -74,9 +74,10 @@ namespace Qpid.Client.Tests _channel.DeclareQueue(replyQueueName, false, true, true); IMessageConsumer messageConsumer = _channel.CreateConsumerBuilder(replyQueueName) - .withPrefetch(100) - .withNoLocal(true) - .withExclusive(true).Create(); + .WithPrefetchLow(100) + .WithPrefetchHigh(200) + .WithNoLocal(true) + .WithExclusive(true).Create(); _startTime = DateTime.Now.Ticks; diff --git a/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs b/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs index 63c936d667..84ae2c92c1 100644 --- a/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs +++ b/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs @@ -81,11 +81,11 @@ namespace Qpid.Client.Tests // Send a test message to a non-existant queue on the default exchange. See if message is returned! MessagePublisherBuilder builder = _channel.CreatePublisherBuilder() - .withRoutingKey("Non-existant route key!") - .withMandatory(true); + .WithRoutingKey("Non-existant route key!") + .WithMandatory(true); if (exchangeName != null) { - builder.withExchangeName(exchangeName); + builder.WithExchangeName(exchangeName); } IMessagePublisher publisher = builder.Create(); publisher.Send(_channel.CreateTextMessage("Hiya!")); diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs index 0ab3fd3411..2ffd6505c6 100644 --- a/dotnet/Qpid.Client/Client/AmqChannel.cs +++ b/dotnet/Qpid.Client/Client/AmqChannel.cs @@ -43,7 +43,7 @@ namespace Qpid.Client // Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature. private int _nextConsumerNumber = 1; - internal const int DEFAULT_PREFETCH = 5000; + internal const int DEFAULT_PREFETCH = MessageConsumerBuilder.DEFAULT_PREFETCH_HIGH; private AMQConnection _connection; @@ -273,6 +273,7 @@ namespace Qpid.Client public void Commit() { + // FIXME: Fail over safety. Needs FailoverSupport? CheckNotClosed(); CheckTransacted(); // throws IllegalOperationException if not a transacted session @@ -297,6 +298,7 @@ namespace Qpid.Client public void Rollback() { + // FIXME: Fail over safety. Needs FailoverSupport? CheckNotClosed(); CheckTransacted(); // throws IllegalOperationException if not a transacted session @@ -489,25 +491,26 @@ namespace Qpid.Client } public IMessageConsumer CreateConsumer(string queueName, - int prefetch, + int prefetchLow, + int prefetchHigh, bool noLocal, bool exclusive, bool durable, string subscriptionName) { - _logger.Debug(String.Format("CreateConsumer queueName={0} prefetch={1} noLocal={2} exclusive={3} durable={4} subscriptionName={5}", - queueName, prefetch, noLocal, exclusive, durable, subscriptionName)); - return CreateConsumerImpl(queueName, prefetch, noLocal, exclusive, durable, subscriptionName); + _logger.Debug(String.Format("CreateConsumer queueName={0} prefetchLow={1} prefetchHigh={2} noLocal={3} exclusive={4} durable={5} subscriptionName={6}", + queueName, prefetchLow, prefetchHigh, noLocal, exclusive, durable, subscriptionName)); + return CreateConsumerImpl(queueName, prefetchLow, prefetchHigh, noLocal, exclusive, durable, subscriptionName); } private IMessageConsumer CreateConsumerImpl(string queueName, - int prefetch, - bool noLocal, - bool exclusive, - bool durable, - string subscriptionName) + int prefetchLow, + int prefetchHigh, + bool noLocal, + bool exclusive, + bool durable, + string subscriptionName) { - if (durable || subscriptionName != null) { throw new NotImplementedException(); // TODO: durable subscriptions. @@ -518,7 +521,8 @@ namespace Qpid.Client CheckNotClosed(); BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, queueName, noLocal, - _messageFactoryRegistry, this); + _messageFactoryRegistry, this, + prefetchHigh, prefetchLow, exclusive); try { RegisterConsumer(consumer); @@ -710,9 +714,8 @@ namespace Qpid.Client /// <param name="consumer"></param> void RegisterConsumer(BasicMessageConsumer consumer) { - String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.Prefetch, consumer.NoLocal, + String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.NoLocal, consumer.Exclusive, consumer.AcknowledgeMode); - consumer.ConsumerTag = consumerTag; _consumers.Add(consumerTag, consumer); } @@ -744,8 +747,7 @@ namespace Qpid.Client } } - private String ConsumeFromQueue(String queueName, int prefetch, - bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode) + private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode) { // Need to generate a consumer tag on the client so we can exploit the nowait flag. String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++); @@ -973,7 +975,6 @@ namespace Qpid.Client public void AcknowledgeMessage(ulong deliveryTag, bool multiple) { AMQFrame ackFrame = BasicAckBody.CreateAMQFrame(_channelId, deliveryTag, multiple); - _logger.Info("XXX sending ack: " + ackFrame); if (_logger.IsDebugEnabled) { _logger.Debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); diff --git a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs index 611a4e5351..133707c609 100644 --- a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs +++ b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs @@ -82,10 +82,15 @@ namespace Qpid.Client /// </summary> private readonly object _syncLock = new object(); - /** - * We store the prefetch field in order to be able to reuse it when resubscribing in the event of failover - */ - private int _prefetch; + /// <summary> + /// We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of failover + /// </summary> + private int _prefetchHigh; + + /// <summary> + /// We store the low water prefetch field in order to be able to reuse it when resubscribing in the event of failover + /// </summary> + private int _prefetchLow; /// <summary> /// When true indicates that either a message listener is set or that @@ -108,8 +113,20 @@ namespace Qpid.Client /// </summary> private long _lastDeliveryTag; - public BasicMessageConsumer(ushort channelId, string queueName, bool noLocal, - MessageFactoryRegistry messageFactory, AmqChannel channel) + /// <summary> + /// Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode + /// </summary> + private int _outstanding; + + /// <summary> + /// Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. + /// Enabled when _outstannding number of msgs >= _prefetchHigh and disabled at < _prefetchLow + /// </summary> + private bool _dups_ok_acknowledge_send; + + internal BasicMessageConsumer(ushort channelId, string queueName, bool noLocal, + MessageFactoryRegistry messageFactory, AmqChannel channel, + int prefetchHigh, int prefetchLow, bool exclusive) { _channelId = channelId; _queueName = queueName; @@ -117,6 +134,9 @@ namespace Qpid.Client _messageFactory = messageFactory; _channel = channel; _acknowledgeMode = _channel.AcknowledgeMode; + _prefetchHigh = prefetchHigh; + _prefetchLow = prefetchLow; + _exclusive = exclusive; } #region IMessageConsumer Members @@ -302,65 +322,6 @@ namespace Qpid.Client } } -// /// <summary> -// /// Called from the AmqChannel when a message has arrived for this consumer. This methods handles both the case -// /// of a message listener or a synchronous receive() caller. -// /// </summary> -// /// <param name="messageFrame">the raw unprocessed mesage</param> -// /// <param name="acknowledgeMode">the acknowledge mode requested for this message</param> -// /// <param name="channelId">channel on which this message was sent</param> -// internal void NotifyMessage(UnprocessedMessage messageFrame, AcknowledgeMode acknowledgeMode, ushort channelId) -// { -// _logger.Info("XXX notifyMessage called with message number " + messageFrame.DeliverBody.DeliveryTag); -// if (_logger.IsDebugEnabled) -// { -// _logger.Debug("notifyMessage called with message number " + messageFrame.DeliverBody.DeliveryTag); -// } -// try -// { -// AbstractQmsMessage jmsMessage = _messageFactory.CreateMessage((long)messageFrame.DeliverBody.DeliveryTag, -// messageFrame.DeliverBody.Redelivered, -// messageFrame.ContentHeader, -// messageFrame.Bodies); - -// /*if (acknowledgeMode == AcknowledgeMode.PreAcknowledge) -// { -// _channel.sendAcknowledgement(messageFrame.deliverBody.deliveryTag); -// }*/ -// if (acknowledgeMode == AcknowledgeMode.ClientAcknowledge) -// { -// // we set the session so that when the user calls acknowledge() it can call the method on session -// // to send out the appropriate frame -// jmsMessage.Channel = _channel; -// } - -// lock (_syncLock) -// { -// if (_messageListener != null) -// { -//#if __MonoCS__ -// _messageListener(jmsMessage); -//#else -// _messageListener.Invoke(jmsMessage); -//#endif -// } -// else -// { -// _synchronousQueue.Enqueue(jmsMessage); -// } -// } -// if (acknowledgeMode == AcknowledgeMode.AutoAcknowledge) -// { -// _channel.SendAcknowledgement(messageFrame.DeliverBody.DeliveryTag); -// } -// } -// catch (Exception e) -// { -// _logger.Error("Caught exception (dump follows) - ignoring...", e); -// } -// } - - /** * Called from the AMQSession when a message has arrived for this consumer. This methods handles both the case * of a message listener or a synchronous receive() caller. @@ -465,11 +426,6 @@ namespace Qpid.Client DeregisterConsumer(); } - public int Prefetch - { - get { return _prefetch; } - } - public string QueueName { get { return _queueName; } @@ -509,7 +465,6 @@ namespace Qpid.Client AbstractQmsMessage msg = (AbstractQmsMessage) m; switch (AcknowledgeMode) { -/* TODO case AcknowledgeMode.DupsOkAcknowledge: if (++_outstanding >= _prefetchHigh) { @@ -519,16 +474,16 @@ namespace Qpid.Client { _dups_ok_acknowledge_send = false; } - if (_dups_ok_acknowledge_send) { - _channel.AcknowledgeMessage(msg.getDeliveryTag(), true); + _channel.AcknowledgeMessage((ulong)msg.DeliveryTag, true); } break; - */ + case AcknowledgeMode.AutoAcknowledge: _channel.AcknowledgeMessage((ulong)msg.DeliveryTag, true); break; + case AcknowledgeMode.SessionTransacted: _lastDeliveryTag = msg.DeliveryTag; break; diff --git a/dotnet/Qpid.Common/Collections/BlockingQueue.cs b/dotnet/Qpid.Common/Collections/BlockingQueue.cs index 7adf6c3af2..71fc9bf518 100644 --- a/dotnet/Qpid.Common/Collections/BlockingQueue.cs +++ b/dotnet/Qpid.Common/Collections/BlockingQueue.cs @@ -24,7 +24,7 @@ using System.Collections; namespace Qpid.Collections { public abstract class BlockingQueue : Queue - { + { /** * Inserts the specified element into this queue if it is possible to do * so immediately without violating capacity restrictions, returning diff --git a/dotnet/Qpid.Common/Collections/SynchronousQueue.cs b/dotnet/Qpid.Common/Collections/SynchronousQueue.cs index d210ea3b42..a678a6c5fc 100644 --- a/dotnet/Qpid.Common/Collections/SynchronousQueue.cs +++ b/dotnet/Qpid.Common/Collections/SynchronousQueue.cs @@ -373,4 +373,3 @@ namespace Qpid.Collections } } } - diff --git a/dotnet/Qpid.Messaging/IChannel.cs b/dotnet/Qpid.Messaging/IChannel.cs index 247d164ae7..7fceb1a532 100644 --- a/dotnet/Qpid.Messaging/IChannel.cs +++ b/dotnet/Qpid.Messaging/IChannel.cs @@ -60,7 +60,8 @@ namespace Qpid.Messaging MessageConsumerBuilder CreateConsumerBuilder(string queueName); IMessageConsumer CreateConsumer(string queueName, - int prefetch, + int prefetchLow, + int prefetchHigh, bool noLocal, bool exclusive, bool durable, diff --git a/dotnet/Qpid.Messaging/MessageConsumerBuilder.cs b/dotnet/Qpid.Messaging/MessageConsumerBuilder.cs index 6699d63a79..4166dd0137 100644 --- a/dotnet/Qpid.Messaging/MessageConsumerBuilder.cs +++ b/dotnet/Qpid.Messaging/MessageConsumerBuilder.cs @@ -22,13 +22,16 @@ namespace Qpid.Messaging { public class MessageConsumerBuilder { - private int _prefetch = 0; + public const int DEFAULT_PREFETCH_HIGH = 5000; + private bool _noLocal = false; private bool _exclusive = false; private bool _durable = false; private string _subscriptionName = null; private IChannel _channel; private readonly string _queueName; + private int _prefetchLow = 2500; + private int _prefetchHigh = DEFAULT_PREFETCH_HIGH; public MessageConsumerBuilder(IChannel channel, string queueName) { @@ -36,31 +39,37 @@ namespace Qpid.Messaging _queueName = queueName; } - public MessageConsumerBuilder withPrefetch(int prefetch) + public MessageConsumerBuilder WithPrefetchLow(int prefetchLow) + { + _prefetchLow = prefetchLow; + return this; + } + + public MessageConsumerBuilder WithPrefetchHigh(int prefetchHigh) { - _prefetch = prefetch; + _prefetchHigh = prefetchHigh; return this; } - public MessageConsumerBuilder withNoLocal(bool noLocal) + public MessageConsumerBuilder WithNoLocal(bool noLocal) { _noLocal = noLocal; return this; } - public MessageConsumerBuilder withExclusive(bool exclusive) + public MessageConsumerBuilder WithExclusive(bool exclusive) { _exclusive = exclusive; return this; } - public MessageConsumerBuilder withDurable(bool durable) + public MessageConsumerBuilder WithDurable(bool durable) { _durable = durable; return this; } - public MessageConsumerBuilder withSubscriptionName(string subscriptionName) + public MessageConsumerBuilder WithSubscriptionName(string subscriptionName) { _subscriptionName = subscriptionName; return this; @@ -68,7 +77,7 @@ namespace Qpid.Messaging public IMessageConsumer Create() { - return _channel.CreateConsumer(_queueName, _prefetch, _noLocal, _exclusive, _durable, _subscriptionName); + return _channel.CreateConsumer(_queueName, _prefetchLow, _prefetchHigh, _noLocal, _exclusive, _durable, _subscriptionName); } } } diff --git a/dotnet/Qpid.Messaging/MessagePublisherBuilder.cs b/dotnet/Qpid.Messaging/MessagePublisherBuilder.cs index ecee1b5c57..ba843049ef 100644 --- a/dotnet/Qpid.Messaging/MessagePublisherBuilder.cs +++ b/dotnet/Qpid.Messaging/MessagePublisherBuilder.cs @@ -47,37 +47,37 @@ namespace Qpid.Messaging _channel = channel; } - public MessagePublisherBuilder withRoutingKey(string routingKey) + public MessagePublisherBuilder WithRoutingKey(string routingKey) { _routingKey = routingKey; return this; } - public MessagePublisherBuilder withExchangeName(string exchangeName) + public MessagePublisherBuilder WithExchangeName(string exchangeName) { _exchangeName = exchangeName; return this; } - public MessagePublisherBuilder withDeliveryMode(DeliveryMode deliveryMode) + public MessagePublisherBuilder WithDeliveryMode(DeliveryMode deliveryMode) { _deliveryMode = deliveryMode; return this; } - public MessagePublisherBuilder withTimeToLive(long timeToLive) + public MessagePublisherBuilder WithTimeToLive(long timeToLive) { _timeToLive = timeToLive; return this; } - public MessagePublisherBuilder withImmediate(bool immediate) + public MessagePublisherBuilder WithImmediate(bool immediate) { _immediate = immediate; return this; } - public MessagePublisherBuilder withMandatory(bool mandatory) + public MessagePublisherBuilder WithMandatory(bool mandatory) { _mandatory = mandatory; return this; diff --git a/dotnet/TODO.txt b/dotnet/TODO.txt index 93f359dacf..a8155ce297 100644 --- a/dotnet/TODO.txt +++ b/dotnet/TODO.txt @@ -1,13 +1,4 @@ -https://issues.apache.org/jira/browse/QPID-136 -* createSession with prefetch (warning: prefetch partly added) - * Do the BasicQos message after opening channel (sets up prefetch). - -https://issues.apache.org/jira/browse/QPID-137 -* .NET currently only supports no-ack mode. Allow acknowledgement support. - * Implement the PreAcknowledge ack mode. Add preDeliver/postDeliver methods in AmqSession like the Java client. - * Implement Recover() with Basic.Recover. - * Port Connection URL support. * Implement durable subscriptions. |