From 5518fd899d97459bcd8c45b850da447697a60fe8 Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Wed, 23 Apr 2008 23:56:38 +0000 Subject: QPID-832 sync from M2.x git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@651113 13f79535-47bb-0310-9956-ffa450edef68 --- .../testcases/CommitRollbackTest.cs | 181 +++++++++++++++++++++ 1 file changed, 181 insertions(+) create mode 100644 qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs (limited to 'qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs') diff --git a/qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs b/qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs new file mode 100644 index 0000000000..aab279285e --- /dev/null +++ b/qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs @@ -0,0 +1,181 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; +using log4net; +using NUnit.Framework; +using Apache.Qpid.Messaging; +using Apache.Qpid.Client.Qms; +using Apache.Qpid.Client; + +namespace Apache.Qpid.Integration.Tests.testcases +{ + /// + /// CommitRollbackTest + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Check that an uncommitted send cannot be received. + ///
Check that a committed send can be received. + ///
Check that a rolled back send cannot be received. + ///
Check that an uncommitted receive can be re-received. + ///
Check that a committed receive cannot be re-received. + ///
Check that a rolled back receive can be re-received. + ///
+ ///

+ [TestFixture, Category("Integration")] + public class CommitRollbackTest : BaseMessagingTestFixture + { + /// Used for debugging purposes. + private static ILog log = LogManager.GetLogger(typeof(CommitRollbackTest)); + + /// Defines the name of the test topic to use with the tests. + public const string TEST_ROUTING_KEY = "commitrollbacktestkey"; + + [SetUp] + public override void Init() + { + base.Init(); + + // Create one producer and one consumer, p2p, tx, consumer with queue bound to producers routing key. + SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, + true, false, null); + SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, + true, false, null); + } + + [TearDown] + public override void Shutdown() + { + try + { + // Clean up after the test. + CloseEndPoint(0); + CloseEndPoint(1); + } + finally + { + base.Shutdown(); + } + } + + /// Check that an uncommitted send cannot be received. + [Test] + public void TestUncommittedSendNotReceived() + { + // Send messages. + testProducer[0].Send(testChannel[0].CreateTextMessage("A")); + + // Try to receive messages. + ConsumeNMessagesOnly(0, "A", testConsumer[1]); + testChannel[1].Commit(); + } + + /// Check that a committed send can be received. + [Test] + public void TestCommittedSendReceived() + { + // Send messages. + testProducer[0].Send(testChannel[0].CreateTextMessage("A")); + testChannel[0].Commit(); + + // Try to receive messages. + ConsumeNMessagesOnly(1, "A", testConsumer[1]); + testChannel[1].Commit(); + } + + /// Check that a rolled back send cannot be received. + [Test] + public void TestRolledBackSendNotReceived() + { + // Send messages. + testProducer[0].Send(testChannel[0].CreateTextMessage("A")); + testChannel[0].Rollback(); + + // Try to receive messages. + ConsumeNMessagesOnly(0, "A", testConsumer[1]); + testChannel[1].Commit(); + } + + /// Check that an uncommitted receive can be re-received. + [Test] + public void TestUncommittedReceiveCanBeRereceived() + { + // Create a third end-point as an alternative delivery route for the message. + SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, + true, false, null); + + // Send messages. + testProducer[0].Send(testChannel[0].CreateTextMessage("A")); + testChannel[0].Commit(); + + // Try to receive messages. + ConsumeNMessagesOnly(1, "A", testConsumer[1]); + + // Close end-point 1 without committing the message, then re-open to consume again. + CloseEndPoint(1); + + // Check that the message was released from the rolled back end-point an can be received on the alternative one instead. + ConsumeNMessagesOnly(1, "A", testConsumer[2]); + + CloseEndPoint(2); + } + + /// Check that a committed receive cannot be re-received. + [Test] + public void TestCommittedReceiveNotRereceived() + { + // Send messages. + testProducer[0].Send(testChannel[0].CreateTextMessage("A")); + testChannel[0].Commit(); + + // Try to receive messages. + ConsumeNMessagesOnly(1, "A", testConsumer[1]); + testChannel[1].Commit(); + + // Try to receive messages. + ConsumeNMessagesOnly(0, "A", testConsumer[1]); + } + + /// Check that a rolled back receive can be re-received. + [Test] + public void TestRolledBackReceiveCanBeRereceived() + { + // Create a third end-point as an alternative delivery route for the message. + SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, + true, false, null); + + // Send messages. + testProducer[0].Send(testChannel[0].CreateTextMessage("A")); + testChannel[0].Commit(); + + // Try to receive messages. + ConsumeNMessagesOnly(1, "A", testConsumer[1]); + + testChannel[1].Rollback(); + + // Try to receive messages. + ConsumeNMessagesOnly(1, "A", testConsumer[2]); + + CloseEndPoint(2); + } + } +} \ No newline at end of file -- cgit v1.2.1 From 0fb8d3ab90269c9f4df552fd0b03f0c557cfbd97 Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Wed, 7 May 2008 14:09:16 +0000 Subject: Merged revisions 653420-654109 via svnmerge from https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.x ........ r653447 | aidan | 2008-05-05 13:26:29 +0100 (Mon, 05 May 2008) | 1 line Check if consumer is closed and dont reclose it ........ r653451 | aidan | 2008-05-05 13:29:15 +0100 (Mon, 05 May 2008) | 1 line QPID-1022 Use synchronous writes to fix race conditions ........ r653452 | aidan | 2008-05-05 13:30:45 +0100 (Mon, 05 May 2008) | 1 line QPID-1023 increase some timeouts ........ r653760 | aidan | 2008-05-06 13:40:34 +0100 (Tue, 06 May 2008) | 3 lines QPID-1029: Generate temporary queue names using GUIDs to ensure uniqueness. ........ r654097 | aidan | 2008-05-07 14:25:38 +0100 (Wed, 07 May 2008) | 2 lines QPID-952, QPID-951, QPID-1032 Fix failover, ensure that it is properly detected, that frames are replayed approrpiately and that failover does not timeout. ........ r654104 | aidan | 2008-05-07 14:46:51 +0100 (Wed, 07 May 2008) | 1 line QPID-952 should have been part of previous commit ........ r654109 | aidan | 2008-05-07 14:56:09 +0100 (Wed, 07 May 2008) | 2 lines QPID-1036 increase timeouts to more reasonable levels, ensure that durable queues are deleted when no longer needed ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@654113 13f79535-47bb-0310-9956-ffa450edef68 --- .../testcases/CommitRollbackTest.cs | 35 ++++++++++------------ 1 file changed, 15 insertions(+), 20 deletions(-) (limited to 'qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs') diff --git a/qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs b/qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs index aab279285e..1951a8a171 100644 --- a/qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs +++ b/qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs @@ -94,11 +94,11 @@ namespace Apache.Qpid.Integration.Tests.testcases public void TestCommittedSendReceived() { // Send messages. - testProducer[0].Send(testChannel[0].CreateTextMessage("A")); + testProducer[0].Send(testChannel[0].CreateTextMessage("B")); testChannel[0].Commit(); // Try to receive messages. - ConsumeNMessagesOnly(1, "A", testConsumer[1]); + ConsumeNMessagesOnly(1, "B", testConsumer[1]); testChannel[1].Commit(); } @@ -107,11 +107,11 @@ namespace Apache.Qpid.Integration.Tests.testcases public void TestRolledBackSendNotReceived() { // Send messages. - testProducer[0].Send(testChannel[0].CreateTextMessage("A")); + testProducer[0].Send(testChannel[0].CreateTextMessage("B")); testChannel[0].Rollback(); // Try to receive messages. - ConsumeNMessagesOnly(0, "A", testConsumer[1]); + ConsumeNMessagesOnly(0, "B", testConsumer[1]); testChannel[1].Commit(); } @@ -124,17 +124,17 @@ namespace Apache.Qpid.Integration.Tests.testcases true, false, null); // Send messages. - testProducer[0].Send(testChannel[0].CreateTextMessage("A")); + testProducer[0].Send(testChannel[0].CreateTextMessage("C")); testChannel[0].Commit(); // Try to receive messages. - ConsumeNMessagesOnly(1, "A", testConsumer[1]); + ConsumeNMessagesOnly(1, "C", testConsumer[1]); // Close end-point 1 without committing the message, then re-open to consume again. CloseEndPoint(1); // Check that the message was released from the rolled back end-point an can be received on the alternative one instead. - ConsumeNMessagesOnly(1, "A", testConsumer[2]); + ConsumeNMessagesOnly(1, "C", testConsumer[2]); CloseEndPoint(2); } @@ -144,38 +144,33 @@ namespace Apache.Qpid.Integration.Tests.testcases public void TestCommittedReceiveNotRereceived() { // Send messages. - testProducer[0].Send(testChannel[0].CreateTextMessage("A")); + testProducer[0].Send(testChannel[0].CreateTextMessage("D")); testChannel[0].Commit(); // Try to receive messages. - ConsumeNMessagesOnly(1, "A", testConsumer[1]); + ConsumeNMessagesOnly(1, "D", testConsumer[1]); testChannel[1].Commit(); // Try to receive messages. - ConsumeNMessagesOnly(0, "A", testConsumer[1]); + ConsumeNMessagesOnly(0, "D", testConsumer[1]); } /// Check that a rolled back receive can be re-received. [Test] public void TestRolledBackReceiveCanBeRereceived() { - // Create a third end-point as an alternative delivery route for the message. - SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, - true, false, null); - // Send messages. - testProducer[0].Send(testChannel[0].CreateTextMessage("A")); + testProducer[0].Send(testChannel[0].CreateTextMessage("E")); testChannel[0].Commit(); // Try to receive messages. - ConsumeNMessagesOnly(1, "A", testConsumer[1]); + ConsumeNMessagesOnly(1, "E", testConsumer[1]); testChannel[1].Rollback(); // Try to receive messages. - ConsumeNMessagesOnly(1, "A", testConsumer[2]); - - CloseEndPoint(2); + ConsumeNMessagesOnly(1, "E", testConsumer[1]); + } } -} \ No newline at end of file +} -- cgit v1.2.1 From 8316d2b0c1affdee50bee3ed8a6297a582f38c77 Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Thu, 8 May 2008 15:09:17 +0000 Subject: QPID-1038 add a mixed send/recieve/rollback test git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@654545 13f79535-47bb-0310-9956-ffa450edef68 --- .../testcases/CommitRollbackTest.cs | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) (limited to 'qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs') diff --git a/qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs b/qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs index 1951a8a171..3224d2cbd2 100644 --- a/qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs +++ b/qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs @@ -58,7 +58,7 @@ namespace Apache.Qpid.Integration.Tests.testcases // Create one producer and one consumer, p2p, tx, consumer with queue bound to producers routing key. SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, true, false, null); - SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, + SetUpEndPoint(1, true, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, true, false, null); } @@ -172,5 +172,22 @@ namespace Apache.Qpid.Integration.Tests.testcases ConsumeNMessagesOnly(1, "E", testConsumer[1]); } + + [Test] + public void TestReceiveAndSendRollback() + { + // Send messages + testProducer[0].Send(testChannel[0].CreateTextMessage("F")); + testChannel[0].Commit(); + + // Try to receive messages. + ConsumeNMessagesOnly(1, "F", testConsumer[1]); + testProducer[1].Send(testChannel[1].CreateTextMessage("G")); + testChannel[1].Rollback(); + + // Try to receive messages. + ConsumeNMessagesOnly(1, "F", testConsumer[1]); + + } } } -- cgit v1.2.1 From 831649f6424871791fa5fac2a5a7403cf4ae45e9 Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Wed, 28 May 2008 14:56:54 +0000 Subject: QPID-1099 Add tests for publishing several messages transactionally and consuming them in an OnMessage handler git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@660966 13f79535-47bb-0310-9956-ffa450edef68 --- .../testcases/CommitRollbackTest.cs | 68 ++++++++++++++++++++++ 1 file changed, 68 insertions(+) (limited to 'qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs') diff --git a/qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs b/qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs index 3224d2cbd2..72074da809 100644 --- a/qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs +++ b/qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs @@ -49,7 +49,19 @@ namespace Apache.Qpid.Integration.Tests.testcases /// Defines the name of the test topic to use with the tests. public const string TEST_ROUTING_KEY = "commitrollbacktestkey"; + + /// Used to count test messages received so far. + private int messageReceivedCount; + + /// Used to hold the expected number of messages to receive. + private int expectedMessageCount; + /// Monitor used to signal succesfull receipt of all test messages. + AutoResetEvent finishedEvent; + + /// Flag used to indicate that all messages really were received, and that the test did not just time out. + private bool allReceived; + [SetUp] public override void Init() { @@ -60,6 +72,12 @@ namespace Apache.Qpid.Integration.Tests.testcases true, false, null); SetUpEndPoint(1, true, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, true, false, null); + + // Clear counts + messageReceivedCount = 0; + expectedMessageCount = 0; + finishedEvent = new AutoResetEvent(false); + allReceived = false; } [TearDown] @@ -189,5 +207,55 @@ namespace Apache.Qpid.Integration.Tests.testcases ConsumeNMessagesOnly(1, "F", testConsumer[1]); } + + [Test] + public void TestReceivePrePublished() + { + // Send messages + for (int i = 0; i < 10; ++i) + { + testProducer[0].Send(testChannel[0].CreateTextMessage("G"+i)); + testChannel[0].Commit(); + } + + for (int i = 0; i < 10; ++i) + { + ConsumeNMessages(1, "G"+i, testConsumer[1]); + } + testChannel[1].Commit(); + } + + [Test] + public void TestReceivePrePublishedOnMessageHandler() + { + testConsumer[1].OnMessage += new MessageReceivedDelegate(OnMessage); + // Send messages + for (int i = 0; i < 10; ++i) + { + testProducer[0].Send(testChannel[0].CreateTextMessage("G"+i)); + testChannel[0].Commit(); + } + expectedMessageCount = 10; + + finishedEvent.WaitOne(new TimeSpan(0, 0, 0, 30), false); + + // Check that all messages really were received. + Assert.IsTrue(allReceived, "All messages were not received, only got: " + messageReceivedCount + " but wanted " + expectedMessageCount); + + testChannel[1].Commit(); + } + + /// Atomically increments the message count on every message, and signals once all messages in the test are received. + public void OnMessage(IMessage m) + { + int newCount = Interlocked.Increment(ref messageReceivedCount); + + if (newCount >= expectedMessageCount) + { + allReceived = true; + finishedEvent.Set(); + } + } + } } -- cgit v1.2.1 From de0e81996d63d8a8b7f92d835d2bbdeaf5cccae0 Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Thu, 3 Dec 2009 23:55:48 +0000 Subject: Fix eol style property git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@886998 13f79535-47bb-0310-9956-ffa450edef68 --- .../testcases/CommitRollbackTest.cs | 522 ++++++++++----------- 1 file changed, 261 insertions(+), 261 deletions(-) (limited to 'qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs') diff --git a/qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs b/qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs index 72074da809..dbb3f70aec 100644 --- a/qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs +++ b/qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs @@ -1,261 +1,261 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -using System; -using System.Threading; -using log4net; -using NUnit.Framework; -using Apache.Qpid.Messaging; -using Apache.Qpid.Client.Qms; -using Apache.Qpid.Client; - -namespace Apache.Qpid.Integration.Tests.testcases -{ - /// - /// CommitRollbackTest - /// - ///

- ///
CRC Card
Responsibilities Collaborations - ///
Check that an uncommitted send cannot be received. - ///
Check that a committed send can be received. - ///
Check that a rolled back send cannot be received. - ///
Check that an uncommitted receive can be re-received. - ///
Check that a committed receive cannot be re-received. - ///
Check that a rolled back receive can be re-received. - ///
- ///

- [TestFixture, Category("Integration")] - public class CommitRollbackTest : BaseMessagingTestFixture - { - /// Used for debugging purposes. - private static ILog log = LogManager.GetLogger(typeof(CommitRollbackTest)); - - /// Defines the name of the test topic to use with the tests. - public const string TEST_ROUTING_KEY = "commitrollbacktestkey"; - - /// Used to count test messages received so far. - private int messageReceivedCount; - - /// Used to hold the expected number of messages to receive. - private int expectedMessageCount; - - /// Monitor used to signal succesfull receipt of all test messages. - AutoResetEvent finishedEvent; - - /// Flag used to indicate that all messages really were received, and that the test did not just time out. - private bool allReceived; - - [SetUp] - public override void Init() - { - base.Init(); - - // Create one producer and one consumer, p2p, tx, consumer with queue bound to producers routing key. - SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, - true, false, null); - SetUpEndPoint(1, true, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, - true, false, null); - - // Clear counts - messageReceivedCount = 0; - expectedMessageCount = 0; - finishedEvent = new AutoResetEvent(false); - allReceived = false; - } - - [TearDown] - public override void Shutdown() - { - try - { - // Clean up after the test. - CloseEndPoint(0); - CloseEndPoint(1); - } - finally - { - base.Shutdown(); - } - } - - /// Check that an uncommitted send cannot be received. - [Test] - public void TestUncommittedSendNotReceived() - { - // Send messages. - testProducer[0].Send(testChannel[0].CreateTextMessage("A")); - - // Try to receive messages. - ConsumeNMessagesOnly(0, "A", testConsumer[1]); - testChannel[1].Commit(); - } - - /// Check that a committed send can be received. - [Test] - public void TestCommittedSendReceived() - { - // Send messages. - testProducer[0].Send(testChannel[0].CreateTextMessage("B")); - testChannel[0].Commit(); - - // Try to receive messages. - ConsumeNMessagesOnly(1, "B", testConsumer[1]); - testChannel[1].Commit(); - } - - /// Check that a rolled back send cannot be received. - [Test] - public void TestRolledBackSendNotReceived() - { - // Send messages. - testProducer[0].Send(testChannel[0].CreateTextMessage("B")); - testChannel[0].Rollback(); - - // Try to receive messages. - ConsumeNMessagesOnly(0, "B", testConsumer[1]); - testChannel[1].Commit(); - } - - /// Check that an uncommitted receive can be re-received. - [Test] - public void TestUncommittedReceiveCanBeRereceived() - { - // Create a third end-point as an alternative delivery route for the message. - SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, - true, false, null); - - // Send messages. - testProducer[0].Send(testChannel[0].CreateTextMessage("C")); - testChannel[0].Commit(); - - // Try to receive messages. - ConsumeNMessagesOnly(1, "C", testConsumer[1]); - - // Close end-point 1 without committing the message, then re-open to consume again. - CloseEndPoint(1); - - // Check that the message was released from the rolled back end-point an can be received on the alternative one instead. - ConsumeNMessagesOnly(1, "C", testConsumer[2]); - - CloseEndPoint(2); - } - - /// Check that a committed receive cannot be re-received. - [Test] - public void TestCommittedReceiveNotRereceived() - { - // Send messages. - testProducer[0].Send(testChannel[0].CreateTextMessage("D")); - testChannel[0].Commit(); - - // Try to receive messages. - ConsumeNMessagesOnly(1, "D", testConsumer[1]); - testChannel[1].Commit(); - - // Try to receive messages. - ConsumeNMessagesOnly(0, "D", testConsumer[1]); - } - - /// Check that a rolled back receive can be re-received. - [Test] - public void TestRolledBackReceiveCanBeRereceived() - { - // Send messages. - testProducer[0].Send(testChannel[0].CreateTextMessage("E")); - testChannel[0].Commit(); - - // Try to receive messages. - ConsumeNMessagesOnly(1, "E", testConsumer[1]); - - testChannel[1].Rollback(); - - // Try to receive messages. - ConsumeNMessagesOnly(1, "E", testConsumer[1]); - - } - - [Test] - public void TestReceiveAndSendRollback() - { - // Send messages - testProducer[0].Send(testChannel[0].CreateTextMessage("F")); - testChannel[0].Commit(); - - // Try to receive messages. - ConsumeNMessagesOnly(1, "F", testConsumer[1]); - testProducer[1].Send(testChannel[1].CreateTextMessage("G")); - testChannel[1].Rollback(); - - // Try to receive messages. - ConsumeNMessagesOnly(1, "F", testConsumer[1]); - - } - - [Test] - public void TestReceivePrePublished() - { - // Send messages - for (int i = 0; i < 10; ++i) - { - testProducer[0].Send(testChannel[0].CreateTextMessage("G"+i)); - testChannel[0].Commit(); - } - - for (int i = 0; i < 10; ++i) - { - ConsumeNMessages(1, "G"+i, testConsumer[1]); - } - testChannel[1].Commit(); - } - - [Test] - public void TestReceivePrePublishedOnMessageHandler() - { - testConsumer[1].OnMessage += new MessageReceivedDelegate(OnMessage); - // Send messages - for (int i = 0; i < 10; ++i) - { - testProducer[0].Send(testChannel[0].CreateTextMessage("G"+i)); - testChannel[0].Commit(); - } - expectedMessageCount = 10; - - finishedEvent.WaitOne(new TimeSpan(0, 0, 0, 30), false); - - // Check that all messages really were received. - Assert.IsTrue(allReceived, "All messages were not received, only got: " + messageReceivedCount + " but wanted " + expectedMessageCount); - - testChannel[1].Commit(); - } - - /// Atomically increments the message count on every message, and signals once all messages in the test are received. - public void OnMessage(IMessage m) - { - int newCount = Interlocked.Increment(ref messageReceivedCount); - - if (newCount >= expectedMessageCount) - { - allReceived = true; - finishedEvent.Set(); - } - } - - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; +using log4net; +using NUnit.Framework; +using Apache.Qpid.Messaging; +using Apache.Qpid.Client.Qms; +using Apache.Qpid.Client; + +namespace Apache.Qpid.Integration.Tests.testcases +{ + /// + /// CommitRollbackTest + /// + ///

+ ///
CRC Card
Responsibilities Collaborations + ///
Check that an uncommitted send cannot be received. + ///
Check that a committed send can be received. + ///
Check that a rolled back send cannot be received. + ///
Check that an uncommitted receive can be re-received. + ///
Check that a committed receive cannot be re-received. + ///
Check that a rolled back receive can be re-received. + ///
+ ///

+ [TestFixture, Category("Integration")] + public class CommitRollbackTest : BaseMessagingTestFixture + { + /// Used for debugging purposes. + private static ILog log = LogManager.GetLogger(typeof(CommitRollbackTest)); + + /// Defines the name of the test topic to use with the tests. + public const string TEST_ROUTING_KEY = "commitrollbacktestkey"; + + /// Used to count test messages received so far. + private int messageReceivedCount; + + /// Used to hold the expected number of messages to receive. + private int expectedMessageCount; + + /// Monitor used to signal succesfull receipt of all test messages. + AutoResetEvent finishedEvent; + + /// Flag used to indicate that all messages really were received, and that the test did not just time out. + private bool allReceived; + + [SetUp] + public override void Init() + { + base.Init(); + + // Create one producer and one consumer, p2p, tx, consumer with queue bound to producers routing key. + SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, + true, false, null); + SetUpEndPoint(1, true, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, + true, false, null); + + // Clear counts + messageReceivedCount = 0; + expectedMessageCount = 0; + finishedEvent = new AutoResetEvent(false); + allReceived = false; + } + + [TearDown] + public override void Shutdown() + { + try + { + // Clean up after the test. + CloseEndPoint(0); + CloseEndPoint(1); + } + finally + { + base.Shutdown(); + } + } + + /// Check that an uncommitted send cannot be received. + [Test] + public void TestUncommittedSendNotReceived() + { + // Send messages. + testProducer[0].Send(testChannel[0].CreateTextMessage("A")); + + // Try to receive messages. + ConsumeNMessagesOnly(0, "A", testConsumer[1]); + testChannel[1].Commit(); + } + + /// Check that a committed send can be received. + [Test] + public void TestCommittedSendReceived() + { + // Send messages. + testProducer[0].Send(testChannel[0].CreateTextMessage("B")); + testChannel[0].Commit(); + + // Try to receive messages. + ConsumeNMessagesOnly(1, "B", testConsumer[1]); + testChannel[1].Commit(); + } + + /// Check that a rolled back send cannot be received. + [Test] + public void TestRolledBackSendNotReceived() + { + // Send messages. + testProducer[0].Send(testChannel[0].CreateTextMessage("B")); + testChannel[0].Rollback(); + + // Try to receive messages. + ConsumeNMessagesOnly(0, "B", testConsumer[1]); + testChannel[1].Commit(); + } + + /// Check that an uncommitted receive can be re-received. + [Test] + public void TestUncommittedReceiveCanBeRereceived() + { + // Create a third end-point as an alternative delivery route for the message. + SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, + true, false, null); + + // Send messages. + testProducer[0].Send(testChannel[0].CreateTextMessage("C")); + testChannel[0].Commit(); + + // Try to receive messages. + ConsumeNMessagesOnly(1, "C", testConsumer[1]); + + // Close end-point 1 without committing the message, then re-open to consume again. + CloseEndPoint(1); + + // Check that the message was released from the rolled back end-point an can be received on the alternative one instead. + ConsumeNMessagesOnly(1, "C", testConsumer[2]); + + CloseEndPoint(2); + } + + /// Check that a committed receive cannot be re-received. + [Test] + public void TestCommittedReceiveNotRereceived() + { + // Send messages. + testProducer[0].Send(testChannel[0].CreateTextMessage("D")); + testChannel[0].Commit(); + + // Try to receive messages. + ConsumeNMessagesOnly(1, "D", testConsumer[1]); + testChannel[1].Commit(); + + // Try to receive messages. + ConsumeNMessagesOnly(0, "D", testConsumer[1]); + } + + /// Check that a rolled back receive can be re-received. + [Test] + public void TestRolledBackReceiveCanBeRereceived() + { + // Send messages. + testProducer[0].Send(testChannel[0].CreateTextMessage("E")); + testChannel[0].Commit(); + + // Try to receive messages. + ConsumeNMessagesOnly(1, "E", testConsumer[1]); + + testChannel[1].Rollback(); + + // Try to receive messages. + ConsumeNMessagesOnly(1, "E", testConsumer[1]); + + } + + [Test] + public void TestReceiveAndSendRollback() + { + // Send messages + testProducer[0].Send(testChannel[0].CreateTextMessage("F")); + testChannel[0].Commit(); + + // Try to receive messages. + ConsumeNMessagesOnly(1, "F", testConsumer[1]); + testProducer[1].Send(testChannel[1].CreateTextMessage("G")); + testChannel[1].Rollback(); + + // Try to receive messages. + ConsumeNMessagesOnly(1, "F", testConsumer[1]); + + } + + [Test] + public void TestReceivePrePublished() + { + // Send messages + for (int i = 0; i < 10; ++i) + { + testProducer[0].Send(testChannel[0].CreateTextMessage("G"+i)); + testChannel[0].Commit(); + } + + for (int i = 0; i < 10; ++i) + { + ConsumeNMessages(1, "G"+i, testConsumer[1]); + } + testChannel[1].Commit(); + } + + [Test] + public void TestReceivePrePublishedOnMessageHandler() + { + testConsumer[1].OnMessage += new MessageReceivedDelegate(OnMessage); + // Send messages + for (int i = 0; i < 10; ++i) + { + testProducer[0].Send(testChannel[0].CreateTextMessage("G"+i)); + testChannel[0].Commit(); + } + expectedMessageCount = 10; + + finishedEvent.WaitOne(new TimeSpan(0, 0, 0, 30), false); + + // Check that all messages really were received. + Assert.IsTrue(allReceived, "All messages were not received, only got: " + messageReceivedCount + " but wanted " + expectedMessageCount); + + testChannel[1].Commit(); + } + + /// Atomically increments the message count on every message, and signals once all messages in the test are received. + public void OnMessage(IMessage m) + { + int newCount = Interlocked.Increment(ref messageReceivedCount); + + if (newCount >= expectedMessageCount) + { + allReceived = true; + finishedEvent.Set(); + } + } + + } +} -- cgit v1.2.1