summaryrefslogtreecommitdiff
path: root/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs
diff options
context:
space:
mode:
Diffstat (limited to 'dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs')
-rw-r--r--dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs37
1 files changed, 25 insertions, 12 deletions
diff --git a/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs b/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs
index 79a04e79eb..923d8c4049 100644
--- a/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs
+++ b/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs
@@ -36,8 +36,11 @@ namespace Qpid.Client.Tests.failover
const int NUM_ITERATIONS = 10;
const int NUM_COMMITED_MESSAGES = 10;
const int NUM_ROLLEDBACK_MESSAGES = 3;
- const int SLEEP_MILLIS = 500;
+ const int SLEEP_MILLIS = 50;
+ // AutoAcknowledge, ClientAcknowledge, DupsOkAcknowledge, NoAcknowledge, PreAcknowledge
+ AcknowledgeMode _acknowledgeMode = AcknowledgeMode.DupsOkAcknowledge;
+ const bool _noWait = true; // use Receive or ReceiveNoWait
AMQConnection _connection;
public void OnMessage(IMessage message)
@@ -45,6 +48,11 @@ namespace Qpid.Client.Tests.failover
try
{
_log.Info("Received: " + ((ITextMessage) message).Text);
+ if (_acknowledgeMode == AcknowledgeMode.ClientAcknowledge)
+ {
+ _log.Info("client acknowledging");
+ message.Acknowledge();
+ }
}
catch (QpidException e)
{
@@ -56,11 +64,13 @@ namespace Qpid.Client.Tests.failover
{
FailoverTxTest _failoverTxTest;
IMessageConsumer _consumer;
+ private bool _noWait;
- internal NoWaitConsumer(FailoverTxTest failoverTxTest, IMessageConsumer channel)
+ internal NoWaitConsumer(FailoverTxTest failoverTxTest, IMessageConsumer channel, bool noWait)
{
_failoverTxTest = failoverTxTest;
_consumer = channel;
+ _noWait = noWait;
}
internal void Run()
@@ -68,7 +78,9 @@ namespace Qpid.Client.Tests.failover
int messages = 0;
while (messages < NUM_COMMITED_MESSAGES)
{
- IMessage msg = _consumer.ReceiveNoWait();
+ IMessage msg;
+ if (_noWait) msg = _consumer.ReceiveNoWait();
+ else msg = _consumer.Receive();
if (msg != null)
{
_log.Info("NoWait received message");
@@ -91,9 +103,10 @@ namespace Qpid.Client.Tests.failover
_connection.ConnectionListener = this;
_log.Info("connection = " + _connection);
_log.Info("connectionInfo = " + connectionInfo);
- _log.Info("connection.asUrl = " + _connection.toURL());
+ _log.Info("connection.AsUrl = " + _connection.toURL());
- IChannel receivingChannel = _connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge);
+ _log.Info("AcknowledgeMode is " + _acknowledgeMode);
+ IChannel receivingChannel = _connection.CreateChannel(false, _acknowledgeMode);
string queueName = receivingChannel.GenerateUniqueName();
@@ -103,17 +116,17 @@ namespace Qpid.Client.Tests.failover
// No need to call Queue.Bind as automatically bound to default direct exchange.
receivingChannel.Bind(queueName, "amq.direct", queueName);
-
- IMessageConsumer consumer = receivingChannel.CreateConsumerBuilder(queueName).Create();
- bool useThread = true;
+ IMessageConsumer consumer = receivingChannel.CreateConsumerBuilder(queueName)
+ .WithPrefetchLow(30)
+ .WithPrefetchHigh(60).Create();
+ bool useThread = false;
if (useThread)
{
- NoWaitConsumer noWaitConsumer = new NoWaitConsumer(this, consumer);
- new Thread(noWaitConsumer.Run).Start();
+ NoWaitConsumer noWaitConsumer = new NoWaitConsumer(this, consumer, _noWait);
+ new Thread(new ThreadStart(noWaitConsumer.Run)).Start();
}
else
{
- //receivingChannel.CreateConsumerBuilder(queueName).Create().OnMessage = new MessageReceivedDelegate(onMessage);
consumer.OnMessage = new MessageReceivedDelegate(OnMessage);
}
@@ -133,7 +146,7 @@ namespace Qpid.Client.Tests.failover
bool transacted = true;
IChannel publishingChannel = _connection.CreateChannel(transacted, AcknowledgeMode.NoAcknowledge);
IMessagePublisher publisher = publishingChannel.CreatePublisherBuilder()
- .withRoutingKey(routingKey)
+ .WithRoutingKey(routingKey)
.Create();
for (int i = 1; i <= NUM_ITERATIONS; ++i)