diff options
Diffstat (limited to 'dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs')
-rw-r--r-- | dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs | 37 |
1 files changed, 25 insertions, 12 deletions
diff --git a/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs b/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs index 79a04e79eb..923d8c4049 100644 --- a/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs +++ b/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs @@ -36,8 +36,11 @@ namespace Qpid.Client.Tests.failover const int NUM_ITERATIONS = 10; const int NUM_COMMITED_MESSAGES = 10; const int NUM_ROLLEDBACK_MESSAGES = 3; - const int SLEEP_MILLIS = 500; + const int SLEEP_MILLIS = 50; + // AutoAcknowledge, ClientAcknowledge, DupsOkAcknowledge, NoAcknowledge, PreAcknowledge + AcknowledgeMode _acknowledgeMode = AcknowledgeMode.DupsOkAcknowledge; + const bool _noWait = true; // use Receive or ReceiveNoWait AMQConnection _connection; public void OnMessage(IMessage message) @@ -45,6 +48,11 @@ namespace Qpid.Client.Tests.failover try { _log.Info("Received: " + ((ITextMessage) message).Text); + if (_acknowledgeMode == AcknowledgeMode.ClientAcknowledge) + { + _log.Info("client acknowledging"); + message.Acknowledge(); + } } catch (QpidException e) { @@ -56,11 +64,13 @@ namespace Qpid.Client.Tests.failover { FailoverTxTest _failoverTxTest; IMessageConsumer _consumer; + private bool _noWait; - internal NoWaitConsumer(FailoverTxTest failoverTxTest, IMessageConsumer channel) + internal NoWaitConsumer(FailoverTxTest failoverTxTest, IMessageConsumer channel, bool noWait) { _failoverTxTest = failoverTxTest; _consumer = channel; + _noWait = noWait; } internal void Run() @@ -68,7 +78,9 @@ namespace Qpid.Client.Tests.failover int messages = 0; while (messages < NUM_COMMITED_MESSAGES) { - IMessage msg = _consumer.ReceiveNoWait(); + IMessage msg; + if (_noWait) msg = _consumer.ReceiveNoWait(); + else msg = _consumer.Receive(); if (msg != null) { _log.Info("NoWait received message"); @@ -91,9 +103,10 @@ namespace Qpid.Client.Tests.failover _connection.ConnectionListener = this; _log.Info("connection = " + _connection); _log.Info("connectionInfo = " + connectionInfo); - _log.Info("connection.asUrl = " + _connection.toURL()); + _log.Info("connection.AsUrl = " + _connection.toURL()); - IChannel receivingChannel = _connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge); + _log.Info("AcknowledgeMode is " + _acknowledgeMode); + IChannel receivingChannel = _connection.CreateChannel(false, _acknowledgeMode); string queueName = receivingChannel.GenerateUniqueName(); @@ -103,17 +116,17 @@ namespace Qpid.Client.Tests.failover // No need to call Queue.Bind as automatically bound to default direct exchange. receivingChannel.Bind(queueName, "amq.direct", queueName); - - IMessageConsumer consumer = receivingChannel.CreateConsumerBuilder(queueName).Create(); - bool useThread = true; + IMessageConsumer consumer = receivingChannel.CreateConsumerBuilder(queueName) + .WithPrefetchLow(30) + .WithPrefetchHigh(60).Create(); + bool useThread = false; if (useThread) { - NoWaitConsumer noWaitConsumer = new NoWaitConsumer(this, consumer); - new Thread(noWaitConsumer.Run).Start(); + NoWaitConsumer noWaitConsumer = new NoWaitConsumer(this, consumer, _noWait); + new Thread(new ThreadStart(noWaitConsumer.Run)).Start(); } else { - //receivingChannel.CreateConsumerBuilder(queueName).Create().OnMessage = new MessageReceivedDelegate(onMessage); consumer.OnMessage = new MessageReceivedDelegate(OnMessage); } @@ -133,7 +146,7 @@ namespace Qpid.Client.Tests.failover bool transacted = true; IChannel publishingChannel = _connection.CreateChannel(transacted, AcknowledgeMode.NoAcknowledge); IMessagePublisher publisher = publishingChannel.CreatePublisherBuilder() - .withRoutingKey(routingKey) + .WithRoutingKey(routingKey) .Create(); for (int i = 1; i <= NUM_ITERATIONS; ++i) |