diff options
Diffstat (limited to 'qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs')
-rw-r--r-- | qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs | 261 |
1 files changed, 261 insertions, 0 deletions
diff --git a/qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs b/qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs new file mode 100644 index 0000000000..dbb3f70aec --- /dev/null +++ b/qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs @@ -0,0 +1,261 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; +using log4net; +using NUnit.Framework; +using Apache.Qpid.Messaging; +using Apache.Qpid.Client.Qms; +using Apache.Qpid.Client; + +namespace Apache.Qpid.Integration.Tests.testcases +{ + /// <summary> + /// CommitRollbackTest + /// + /// <p><table id="crc"><caption>CRC Card</caption> + /// <tr><th> Responsibilities <th> Collaborations + /// <tr><td> Check that an uncommitted send cannot be received. + /// <tr><td> Check that a committed send can be received. + /// <tr><td> Check that a rolled back send cannot be received. + /// <tr><td> Check that an uncommitted receive can be re-received. + /// <tr><td> Check that a committed receive cannot be re-received. + /// <tr><td> Check that a rolled back receive can be re-received. + /// </table> + /// </summary> + [TestFixture, Category("Integration")] + public class CommitRollbackTest : BaseMessagingTestFixture + { + /// <summary>Used for debugging purposes.</summary> + private static ILog log = LogManager.GetLogger(typeof(CommitRollbackTest)); + + /// <summary>Defines the name of the test topic to use with the tests.</summary> + public const string TEST_ROUTING_KEY = "commitrollbacktestkey"; + + /// <summary>Used to count test messages received so far.</summary> + private int messageReceivedCount; + + /// <summary>Used to hold the expected number of messages to receive.</summary> + private int expectedMessageCount; + + /// <summary>Monitor used to signal succesfull receipt of all test messages.</summary> + AutoResetEvent finishedEvent; + + /// <summary>Flag used to indicate that all messages really were received, and that the test did not just time out. </summary> + private bool allReceived; + + [SetUp] + public override void Init() + { + base.Init(); + + // Create one producer and one consumer, p2p, tx, consumer with queue bound to producers routing key. + SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, + true, false, null); + SetUpEndPoint(1, true, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, + true, false, null); + + // Clear counts + messageReceivedCount = 0; + expectedMessageCount = 0; + finishedEvent = new AutoResetEvent(false); + allReceived = false; + } + + [TearDown] + public override void Shutdown() + { + try + { + // Clean up after the test. + CloseEndPoint(0); + CloseEndPoint(1); + } + finally + { + base.Shutdown(); + } + } + + /// <summary> Check that an uncommitted send cannot be received. </summary> + [Test] + public void TestUncommittedSendNotReceived() + { + // Send messages. + testProducer[0].Send(testChannel[0].CreateTextMessage("A")); + + // Try to receive messages. + ConsumeNMessagesOnly(0, "A", testConsumer[1]); + testChannel[1].Commit(); + } + + /// <summary> Check that a committed send can be received. </summary> + [Test] + public void TestCommittedSendReceived() + { + // Send messages. + testProducer[0].Send(testChannel[0].CreateTextMessage("B")); + testChannel[0].Commit(); + + // Try to receive messages. + ConsumeNMessagesOnly(1, "B", testConsumer[1]); + testChannel[1].Commit(); + } + + /// <summary> Check that a rolled back send cannot be received. </summary> + [Test] + public void TestRolledBackSendNotReceived() + { + // Send messages. + testProducer[0].Send(testChannel[0].CreateTextMessage("B")); + testChannel[0].Rollback(); + + // Try to receive messages. + ConsumeNMessagesOnly(0, "B", testConsumer[1]); + testChannel[1].Commit(); + } + + /// <summary> Check that an uncommitted receive can be re-received. </summary> + [Test] + public void TestUncommittedReceiveCanBeRereceived() + { + // Create a third end-point as an alternative delivery route for the message. + SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, + true, false, null); + + // Send messages. + testProducer[0].Send(testChannel[0].CreateTextMessage("C")); + testChannel[0].Commit(); + + // Try to receive messages. + ConsumeNMessagesOnly(1, "C", testConsumer[1]); + + // Close end-point 1 without committing the message, then re-open to consume again. + CloseEndPoint(1); + + // Check that the message was released from the rolled back end-point an can be received on the alternative one instead. + ConsumeNMessagesOnly(1, "C", testConsumer[2]); + + CloseEndPoint(2); + } + + /// <summary> Check that a committed receive cannot be re-received. </summary> + [Test] + public void TestCommittedReceiveNotRereceived() + { + // Send messages. + testProducer[0].Send(testChannel[0].CreateTextMessage("D")); + testChannel[0].Commit(); + + // Try to receive messages. + ConsumeNMessagesOnly(1, "D", testConsumer[1]); + testChannel[1].Commit(); + + // Try to receive messages. + ConsumeNMessagesOnly(0, "D", testConsumer[1]); + } + + /// <summary> Check that a rolled back receive can be re-received. </summary> + [Test] + public void TestRolledBackReceiveCanBeRereceived() + { + // Send messages. + testProducer[0].Send(testChannel[0].CreateTextMessage("E")); + testChannel[0].Commit(); + + // Try to receive messages. + ConsumeNMessagesOnly(1, "E", testConsumer[1]); + + testChannel[1].Rollback(); + + // Try to receive messages. + ConsumeNMessagesOnly(1, "E", testConsumer[1]); + + } + + [Test] + public void TestReceiveAndSendRollback() + { + // Send messages + testProducer[0].Send(testChannel[0].CreateTextMessage("F")); + testChannel[0].Commit(); + + // Try to receive messages. + ConsumeNMessagesOnly(1, "F", testConsumer[1]); + testProducer[1].Send(testChannel[1].CreateTextMessage("G")); + testChannel[1].Rollback(); + + // Try to receive messages. + ConsumeNMessagesOnly(1, "F", testConsumer[1]); + + } + + [Test] + public void TestReceivePrePublished() + { + // Send messages + for (int i = 0; i < 10; ++i) + { + testProducer[0].Send(testChannel[0].CreateTextMessage("G"+i)); + testChannel[0].Commit(); + } + + for (int i = 0; i < 10; ++i) + { + ConsumeNMessages(1, "G"+i, testConsumer[1]); + } + testChannel[1].Commit(); + } + + [Test] + public void TestReceivePrePublishedOnMessageHandler() + { + testConsumer[1].OnMessage += new MessageReceivedDelegate(OnMessage); + // Send messages + for (int i = 0; i < 10; ++i) + { + testProducer[0].Send(testChannel[0].CreateTextMessage("G"+i)); + testChannel[0].Commit(); + } + expectedMessageCount = 10; + + finishedEvent.WaitOne(new TimeSpan(0, 0, 0, 30), false); + + // Check that all messages really were received. + Assert.IsTrue(allReceived, "All messages were not received, only got: " + messageReceivedCount + " but wanted " + expectedMessageCount); + + testChannel[1].Commit(); + } + + /// <summary> Atomically increments the message count on every message, and signals once all messages in the test are received. </summary> + public void OnMessage(IMessage m) + { + int newCount = Interlocked.Increment(ref messageReceivedCount); + + if (newCount >= expectedMessageCount) + { + allReceived = true; + finishedEvent.Set(); + } + } + + } +} |