summaryrefslogtreecommitdiff
path: root/qpid/dotnet/Qpid.Integration.Tests/testcases
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2011-05-27 15:44:23 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2011-05-27 15:44:23 +0000
commit66765100f4257159622cefe57bed50125a5ad017 (patch)
treea88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /qpid/dotnet/Qpid.Integration.Tests/testcases
parent1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff)
parent88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff)
downloadqpid-python-rajith_jms_client.tar.gz
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/dotnet/Qpid.Integration.Tests/testcases')
-rw-r--r--qpid/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs280
-rw-r--r--qpid/dotnet/Qpid.Integration.Tests/testcases/ChannelQueueTest.cs237
-rw-r--r--qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs261
-rw-r--r--qpid/dotnet/Qpid.Integration.Tests/testcases/ConnectionTest.cs73
-rw-r--r--qpid/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs166
-rw-r--r--qpid/dotnet/Qpid.Integration.Tests/testcases/HeadersExchangeTest.cs282
-rw-r--r--qpid/dotnet/Qpid.Integration.Tests/testcases/MandatoryMessageTest.cs149
-rw-r--r--qpid/dotnet/Qpid.Integration.Tests/testcases/ProducerMultiConsumerTest.cs167
-rwxr-xr-xqpid/dotnet/Qpid.Integration.Tests/testcases/Qpid.Integration.Tests.csproj64
-rw-r--r--qpid/dotnet/Qpid.Integration.Tests/testcases/QueueBrowsingTest.cs121
-rw-r--r--qpid/dotnet/Qpid.Integration.Tests/testcases/SslConnectionTest.cs64
-rw-r--r--qpid/dotnet/Qpid.Integration.Tests/testcases/SustainedTest.cs109
12 files changed, 1973 insertions, 0 deletions
diff --git a/qpid/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs b/qpid/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs
new file mode 100644
index 0000000000..e67d96f188
--- /dev/null
+++ b/qpid/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs
@@ -0,0 +1,280 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Text;
+using log4net;
+using NUnit.Framework;
+using Apache.Qpid.Messaging;
+using Apache.Qpid.Client.Qms;
+using Apache.Qpid.Client;
+
+namespace Apache.Qpid.Integration.Tests.testcases
+{
+ /// <summary>
+ /// Provides a basis for writing Unit tests that communicate with an AMQ protocol broker. By default it creates a connection
+ /// to a message broker running on localhost on the standard AMQ port, 5672, using guest:guest login credentials. It also
+ /// creates a standard auto-ack channel on this connection.
+ /// </summary>
+ public class BaseMessagingTestFixture
+ {
+ private static ILog log = LogManager.GetLogger(typeof(BaseMessagingTestFixture));
+
+ /// <summary> Used to build dummy data to fill test messages with. </summary>
+ private const string MESSAGE_DATA_BYTES = "-- Test Message -- Test Message -- Test Message -- Test Message -- Test Message ";
+
+ /// <summary> The default timeout in milliseconds to use on receives. </summary>
+ private const long RECEIVE_WAIT = 2000;
+
+ /// <summary> The default AMQ connection URL to use for tests. </summary>
+ public const string connectionUri = "amqp://guest:guest@test/test?brokerlist='tcp://localhost:5672'";
+
+ /// <summary> The default AMQ connection URL parsed as a connection info. </summary>
+ protected IConnectionInfo connectionInfo;
+
+ /// <summary> Holds an array of connections for building mutiple test end-points. </summary>
+ protected IConnection[] testConnection = new IConnection[10];
+
+ /// <summary> Holds an array of channels for building mutiple test end-points. </summary>
+ protected IChannel[] testChannel = new IChannel[10];
+
+ /// <summary> Holds an array of queues for building mutiple test end-points. </summary>
+ protected String[] testQueue = new String[10];
+
+ /// <summary> Holds an array of producers for building mutiple test end-points. </summary>
+ protected IMessagePublisher[] testProducer = new IMessagePublisher[10];
+
+ /// <summary> Holds an array of consumers for building mutiple test end-points. </summary>
+ protected IMessageConsumer[] testConsumer = new IMessageConsumer[10];
+
+ /// <summary> A counter used to supply unique ids. </summary>
+ private static int uniqueId = 0;
+
+ /// <summary> Used to hold unique ids per test. </summary>
+ protected Guid testId;
+
+ /// <summary> Creates the test connection and channel. </summary>
+ [SetUp]
+ public virtual void Init()
+ {
+ log.Debug("public virtual void Init(): called");
+
+ // Set up a unique id for this test.
+ testId = System.Guid.NewGuid();
+ }
+
+ /// <summary>
+ /// Disposes of the test connection. This is called manually because the connection is a field so dispose will not be automatically
+ /// called on it.
+ /// </summary>
+ [TearDown]
+ public virtual void Shutdown()
+ {
+ log.Debug("public virtual void Shutdown(): called");
+ }
+
+ /// <summary> Sets up the nth test end-point. </summary>
+ ///
+ /// <param name="n">The index of the test end-point to set up.</param>
+ /// <param name="producer"><tt>true</tt> to set up a producer on the end-point.</param>
+ /// <param name="consumer"><tt>true</tt> to set up a consumer on the end-point.</param>
+ /// <param name="routingKey">The routing key for the producer to send on.</param>
+ /// <param name="ackMode">The ack mode for the end-points channel.</param>
+ /// <param name="transacted"><tt>true</tt> to use transactions on the end-points channel.</param>
+ /// <param name="exchangeName">The exchange to produce or consume on.</param>
+ /// <param name="declareBind"><tt>true</tt> if the consumers queue should be declared and bound, <tt>false</tt> if it has already been.</param>
+ /// <param name="durable"><tt>true</tt> to declare the consumers queue as durable.</param>
+ /// <param name="subscriptionName">If durable is true, the fixed unique queue name to use.</param>
+ public void SetUpEndPoint(int n, bool producer, bool consumer, string routingKey, AcknowledgeMode ackMode, bool transacted,
+ string exchangeName, bool declareBind, bool durable, string subscriptionName)
+ {
+ SetUpEndPoint(n, producer, consumer, routingKey, ackMode, transacted, exchangeName, declareBind, durable, subscriptionName, true, false);
+ }
+ /// <summary> Sets up the nth test end-point. </summary>
+ ///
+ /// <param name="n">The index of the test end-point to set up.</param>
+ /// <param name="producer"><tt>true</tt> to set up a producer on the end-point.</param>
+ /// <param name="consumer"><tt>true</tt> to set up a consumer on the end-point.</param>
+ /// <param name="routingKey">The routing key for the producer to send on.</param>
+ /// <param name="ackMode">The ack mode for the end-points channel.</param>
+ /// <param name="transacted"><tt>true</tt> to use transactions on the end-points channel.</param>
+ /// <param name="exchangeName">The exchange to produce or consume on.</param>
+ /// <param name="declareBind"><tt>true</tt> if the consumers queue should be declared and bound, <tt>false</tt> if it has already been.</param>
+ /// <param name="durable"><tt>true</tt> to declare the consumers queue as durable.</param>
+ /// <param name="subscriptionName">If durable is true, the fixed unique queue name to use.</param>
+ /// <param name="exclusive"><tt>true</tt> declare queue as exclusive.</param>
+ /// <param name="browse"><tt>true</tt> only browse, don''t consume.</param>
+ public void SetUpEndPoint(int n, bool producer, bool consumer, string routingKey, AcknowledgeMode ackMode, bool transacted,
+ string exchangeName, bool declareBind, bool durable, string subscriptionName, bool exclusive, bool browse)
+ {
+ // Allow client id to be fixed, or undefined.
+ {
+ // Use unique id for end point.
+ connectionInfo = QpidConnectionInfo.FromUrl(connectionUri);
+
+ connectionInfo.ClientName = "test" + n;
+ }
+
+ testConnection[n] = new AMQConnection(connectionInfo);
+ testConnection[n].Start();
+ testChannel[n] = testConnection[n].CreateChannel(transacted, ackMode);
+
+ if (producer)
+ {
+ testProducer[n] = testChannel[n].CreatePublisherBuilder()
+ .WithExchangeName(exchangeName)
+ .WithRoutingKey(routingKey)
+ .Create();
+ }
+
+ if (consumer)
+ {
+ string queueName;
+
+ // Use the subscription name as the queue name if the subscription is durable, otherwise use a generated name.
+ if (durable)
+ {
+ // The durable queue is declared without auto-delete, and passively, in case it has already been declared.
+ queueName = subscriptionName;
+
+ if (declareBind)
+ {
+ testChannel[n].DeclareQueue(queueName, durable, exclusive, false);
+ testChannel[n].Bind(queueName, exchangeName, routingKey);
+ }
+ }
+ else
+ {
+ queueName = testChannel[n].GenerateUniqueName();
+
+ if (declareBind)
+ {
+ if (durable)
+ {
+ testQueue[n] = queueName;
+ }
+ testChannel[n].DeclareQueue(queueName, durable, true, true);
+ testChannel[n].Bind(queueName, exchangeName, routingKey);
+ }
+ }
+
+ testConsumer[n] = testChannel[n].CreateConsumerBuilder(queueName).WithBrowse(browse).Create();
+ }
+ }
+
+ /// <summary> Closes down the nth test end-point. </summary>
+ public void CloseEndPoint(int n)
+ {
+ log.Debug("public void CloseEndPoint(int n): called");
+
+ if (testProducer[n] != null)
+ {
+ testProducer[n].Close();
+ testProducer[n].Dispose();
+ testProducer[n] = null;
+ }
+
+ if (testConsumer[n] != null)
+ {
+ if (testQueue[n] != null)
+ {
+ testChannel[n].DeleteQueue(testQueue[n], false, false, true);
+ }
+ testConsumer[n].Close();
+ testConsumer[n].Dispose();
+ testConsumer[n] = null;
+ }
+
+ if (testConnection[n] != null)
+ {
+ testConnection[n].Stop();
+ testConnection[n].Close();
+ testConnection[n].Dispose();
+ testConnection[n] = null;
+ }
+ }
+
+ /// <summary>
+ /// Consumes n messages, checking that the n+1th is not available within a timeout, and that the consumed messages
+ /// are text messages with contents equal to the specified message body.
+ /// </summary>
+ ///
+ /// <param name="n">The number of messages to consume.</param>
+ /// <param name="body">The body text to match against all messages.</param>
+ /// <param name="consumer">The message consumer to recieve the messages on.</param>
+ public static void ConsumeNMessagesOnly(int n, string body, IMessageConsumer consumer)
+ {
+ ConsumeNMessages(n, body, consumer);
+
+ // Check that one more than n cannot be received.
+ IMessage msg = consumer.Receive(RECEIVE_WAIT);
+ Assert.IsNull(msg, "Consumer got more messages than the number requested (" + n + ").");
+ }
+
+ /// <summary>
+ /// Consumes n messages, checking that the n+1th is not available within a timeout, and that the consumed messages
+ /// are text messages with contents equal to the specified message body.
+ /// </summary>
+ ///
+ /// <param name="n">The number of messages to consume.</param>
+ /// <param name="body">The body text to match against all messages.</param>
+ /// <param name="consumer">The message consumer to recieve the messages on.</param>
+ public static void ConsumeNMessages(int n, string body, IMessageConsumer consumer)
+ {
+ IMessage msg;
+
+ // Try to receive n messages.
+ for (int i = 0; i < n; i++)
+ {
+ msg = consumer.Receive(RECEIVE_WAIT);
+ Assert.IsNotNull(msg, "Consumer did not receive message number: " + i);
+ Assert.AreEqual(body, ((ITextMessage)msg).Text, "Incorrect Message recevied on consumer1.");
+ }
+ }
+
+ /// <summary>Creates the requested number of bytes of dummy text. Usually used for filling test messages. </summary>
+ ///
+ /// <param name="size">The number of bytes of dummy text to generate.</param>
+ ///
+ /// <return>The requested number of bytes of dummy text.</return>
+ public static String GetData(int size)
+ {
+ StringBuilder buf = new StringBuilder(size);
+
+ if (size > 0)
+ {
+ int div = MESSAGE_DATA_BYTES.Length / size;
+ int mod = MESSAGE_DATA_BYTES.Length % size;
+
+ for (int i = 0; i < div; i++)
+ {
+ buf.Append(MESSAGE_DATA_BYTES);
+ }
+
+ if (mod != 0)
+ {
+ buf.Append(MESSAGE_DATA_BYTES, 0, mod);
+ }
+ }
+
+ return buf.ToString();
+ }
+ }
+}
diff --git a/qpid/dotnet/Qpid.Integration.Tests/testcases/ChannelQueueTest.cs b/qpid/dotnet/Qpid.Integration.Tests/testcases/ChannelQueueTest.cs
new file mode 100644
index 0000000000..4692e7ecb1
--- /dev/null
+++ b/qpid/dotnet/Qpid.Integration.Tests/testcases/ChannelQueueTest.cs
@@ -0,0 +1,237 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Net;
+using System.Threading;
+using log4net;
+using Apache.Qpid.Client.Qms;
+using Apache.Qpid.Client;
+using Apache.Qpid.Messaging;
+using NUnit.Framework;
+
+namespace Apache.Qpid.Integration.Tests.testcases
+{
+ /// <summary>
+ /// Test the queue methods
+ /// </summary>
+ [TestFixture, Category("Integration")]
+ public class ChannelQueueTest
+ {
+ private static ILog _logger = LogManager.GetLogger(typeof(ChannelQueueTest));
+
+ /// <summary> The default AMQ connection URL to use for tests. </summary>
+ const string DEFAULT_URI = "amqp://guest:guest@default/test?brokerlist='tcp://localhost:5672'";
+ const string _routingKey = "ServiceQ1";
+
+ private ExceptionListenerDelegate _exceptionDelegate;
+ private AutoResetEvent _evt = new AutoResetEvent(false);
+ private Exception _lastException = null;
+
+ private IMessageConsumer _consumer;
+ private IMessagePublisher _publisher;
+ private IChannel _channel;
+ private IConnection _connection;
+
+ private string _queueName;
+
+ [SetUp]
+ public virtual void Init()
+ {
+ _logger.Info("public virtual void Init(): called");
+
+ // Create a connection to the broker.
+ IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(DEFAULT_URI);
+ _connection = new AMQConnection(connectionInfo);
+ _logger.Info("Starting...");
+
+ // Register this to listen for exceptions on the test connection.
+ _exceptionDelegate = new ExceptionListenerDelegate(OnException);
+ _connection.ExceptionListener += _exceptionDelegate;
+
+ // Establish a session on the broker.
+ _channel = _connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge, 1);
+
+ // Create a durable, non-temporary, non-exclusive queue.
+ _queueName = _channel.GenerateUniqueName();
+ _channel.DeclareQueue(_queueName, true, false, false);
+
+ _channel.Bind(_queueName, ExchangeNameDefaults.TOPIC, _routingKey);
+
+ // Clear the most recent message and exception.
+ _lastException = null;
+ }
+
+ [TearDown]
+ public virtual void ShutDown()
+ {
+ _logger.Info("public virtual void Shutdown(): called");
+
+ if (_connection != null)
+ {
+ _logger.Info("Disposing connection.");
+ _connection.Dispose();
+ _logger.Info("Connection disposed.");
+ }
+ }
+
+ [Test]
+ public void DeleteUsedQueue()
+ {
+ // Create the consumer
+ _consumer = _channel.CreateConsumerBuilder(_queueName)
+ .WithPrefetchLow(100)
+ .Create();
+ _logger.Info("Consumer was created...");
+
+ // delete the queue
+ _channel.DeleteQueue(_queueName, false, true, true);
+ _logger.InfoFormat("Queue {0} was delete", _queueName);
+
+ Assert.IsNull(_lastException);
+ }
+
+ [Test]
+ public void DeleteUnusedQueue()
+ {
+ // delete the queue
+ _channel.DeleteQueue(_queueName, true, true, true);
+ _logger.InfoFormat("Queue {0} was delete", _queueName);
+
+ Assert.IsNull(_lastException);
+ }
+
+ [Test]
+ public void DeleteNonEmptyQueue()
+ {
+ // Create the publisher
+ _publisher = _channel.CreatePublisherBuilder()
+ .WithExchangeName(ExchangeNameDefaults.TOPIC)
+ .WithRoutingKey(_routingKey)
+ .Create();
+ _logger.Info("Publisher created...");
+ SendTestMessage("DeleteNonEmptyQueue Message 1");
+
+ try
+ {
+ _channel.DeleteQueue(_queueName, true, false, true);
+ }
+ catch (AMQException)
+ {
+ Assert.Fail("The test fails");
+ }
+ }
+
+ [Test]
+ public void DeleteEmptyQueue()
+ {
+ // Create the publisher
+ _publisher = _channel.CreatePublisherBuilder()
+ .WithExchangeName(ExchangeNameDefaults.TOPIC)
+ .WithRoutingKey(_routingKey)
+ .Create();
+ _logger.Info("Publisher created...");
+
+ // delete an empty queue with ifEmpty = true
+ _channel.DeleteQueue(_queueName, false, true, true);
+
+ Assert.IsNull(_lastException);
+ }
+
+ [Test]
+ public void DeleteQueueWithResponse()
+ {
+ // Create the publisher
+ _publisher = _channel.CreatePublisherBuilder()
+ .WithExchangeName(ExchangeNameDefaults.TOPIC)
+ .WithRoutingKey(_routingKey)
+ .Create();
+ _logger.Info("Publisher created...");
+
+ SendTestMessage("DeleteQueueWithResponse Message 1");
+ SendTestMessage("DeleteQueueWithResponse Message 2");
+
+ // delete the queue, the server must respond
+ _channel.DeleteQueue(_queueName, false, false, false);
+ }
+
+ [Test]
+ public void PurgeQueueWithResponse()
+ {
+ _publisher = _channel.CreatePublisherBuilder()
+ .WithExchangeName(ExchangeNameDefaults.TOPIC)
+ .WithRoutingKey(_routingKey)
+ .Create();
+ _logger.Info("Pubisher created");
+
+ SendTestMessage("Message 1");
+ SendTestMessage("Message 2");
+
+ _channel.PurgeQueue(_queueName, false);
+ }
+
+ [Test]
+ public void PurgeQueueWithOutResponse()
+ {
+ _publisher = _channel.CreatePublisherBuilder()
+ .WithExchangeName(ExchangeNameDefaults.TOPIC)
+ .WithRoutingKey(_routingKey)
+ .Create();
+ _logger.Info("Pubisher created");
+
+ SendTestMessage("Message 1");
+ SendTestMessage("Message 2");
+
+ _channel.PurgeQueue(_queueName, true);
+ }
+
+
+ /// <summary>
+ /// Callback method to handle any exceptions raised by the test connection.</summary> ///
+ /// <param name="e">The connection exception.</param>
+ public void OnException(Exception e)
+ {
+ // Preserve the most recent exception in case test cases need to examine it.
+ _lastException = e;
+
+ // Notify any waiting threads that an exception event has occurred.
+ _evt.Set();
+ }
+
+ /// <summary>
+ /// Sends the specified message to the test publisher, and confirms that it was received by the test consumer or not
+ /// depending on whether or not the message should be received by the consumer.
+ ///
+ /// Any exceptions raised by the connection will cause an Assert failure exception to be raised.
+ /// </summary>
+ ///
+ /// <param name="msgSend">The message to send.</param>
+ private void SendTestMessage(string msg)
+ {
+ // create the IMessage object
+ IMessage msgSend = _channel.CreateTextMessage(msg);
+
+ // send the message
+ _publisher.Send(msgSend);
+ _logger.InfoFormat("The messages \"{0}\" was sent", msg);
+ }
+
+ }
+}
diff --git a/qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs b/qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs
new file mode 100644
index 0000000000..dbb3f70aec
--- /dev/null
+++ b/qpid/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs
@@ -0,0 +1,261 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+using log4net;
+using NUnit.Framework;
+using Apache.Qpid.Messaging;
+using Apache.Qpid.Client.Qms;
+using Apache.Qpid.Client;
+
+namespace Apache.Qpid.Integration.Tests.testcases
+{
+ /// <summary>
+ /// CommitRollbackTest
+ ///
+ /// <p><table id="crc"><caption>CRC Card</caption>
+ /// <tr><th> Responsibilities <th> Collaborations
+ /// <tr><td> Check that an uncommitted send cannot be received.
+ /// <tr><td> Check that a committed send can be received.
+ /// <tr><td> Check that a rolled back send cannot be received.
+ /// <tr><td> Check that an uncommitted receive can be re-received.
+ /// <tr><td> Check that a committed receive cannot be re-received.
+ /// <tr><td> Check that a rolled back receive can be re-received.
+ /// </table>
+ /// </summary>
+ [TestFixture, Category("Integration")]
+ public class CommitRollbackTest : BaseMessagingTestFixture
+ {
+ /// <summary>Used for debugging purposes.</summary>
+ private static ILog log = LogManager.GetLogger(typeof(CommitRollbackTest));
+
+ /// <summary>Defines the name of the test topic to use with the tests.</summary>
+ public const string TEST_ROUTING_KEY = "commitrollbacktestkey";
+
+ /// <summary>Used to count test messages received so far.</summary>
+ private int messageReceivedCount;
+
+ /// <summary>Used to hold the expected number of messages to receive.</summary>
+ private int expectedMessageCount;
+
+ /// <summary>Monitor used to signal succesfull receipt of all test messages.</summary>
+ AutoResetEvent finishedEvent;
+
+ /// <summary>Flag used to indicate that all messages really were received, and that the test did not just time out. </summary>
+ private bool allReceived;
+
+ [SetUp]
+ public override void Init()
+ {
+ base.Init();
+
+ // Create one producer and one consumer, p2p, tx, consumer with queue bound to producers routing key.
+ SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT,
+ true, false, null);
+ SetUpEndPoint(1, true, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT,
+ true, false, null);
+
+ // Clear counts
+ messageReceivedCount = 0;
+ expectedMessageCount = 0;
+ finishedEvent = new AutoResetEvent(false);
+ allReceived = false;
+ }
+
+ [TearDown]
+ public override void Shutdown()
+ {
+ try
+ {
+ // Clean up after the test.
+ CloseEndPoint(0);
+ CloseEndPoint(1);
+ }
+ finally
+ {
+ base.Shutdown();
+ }
+ }
+
+ /// <summary> Check that an uncommitted send cannot be received. </summary>
+ [Test]
+ public void TestUncommittedSendNotReceived()
+ {
+ // Send messages.
+ testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
+
+ // Try to receive messages.
+ ConsumeNMessagesOnly(0, "A", testConsumer[1]);
+ testChannel[1].Commit();
+ }
+
+ /// <summary> Check that a committed send can be received. </summary>
+ [Test]
+ public void TestCommittedSendReceived()
+ {
+ // Send messages.
+ testProducer[0].Send(testChannel[0].CreateTextMessage("B"));
+ testChannel[0].Commit();
+
+ // Try to receive messages.
+ ConsumeNMessagesOnly(1, "B", testConsumer[1]);
+ testChannel[1].Commit();
+ }
+
+ /// <summary> Check that a rolled back send cannot be received. </summary>
+ [Test]
+ public void TestRolledBackSendNotReceived()
+ {
+ // Send messages.
+ testProducer[0].Send(testChannel[0].CreateTextMessage("B"));
+ testChannel[0].Rollback();
+
+ // Try to receive messages.
+ ConsumeNMessagesOnly(0, "B", testConsumer[1]);
+ testChannel[1].Commit();
+ }
+
+ /// <summary> Check that an uncommitted receive can be re-received. </summary>
+ [Test]
+ public void TestUncommittedReceiveCanBeRereceived()
+ {
+ // Create a third end-point as an alternative delivery route for the message.
+ SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT,
+ true, false, null);
+
+ // Send messages.
+ testProducer[0].Send(testChannel[0].CreateTextMessage("C"));
+ testChannel[0].Commit();
+
+ // Try to receive messages.
+ ConsumeNMessagesOnly(1, "C", testConsumer[1]);
+
+ // Close end-point 1 without committing the message, then re-open to consume again.
+ CloseEndPoint(1);
+
+ // Check that the message was released from the rolled back end-point an can be received on the alternative one instead.
+ ConsumeNMessagesOnly(1, "C", testConsumer[2]);
+
+ CloseEndPoint(2);
+ }
+
+ /// <summary> Check that a committed receive cannot be re-received. </summary>
+ [Test]
+ public void TestCommittedReceiveNotRereceived()
+ {
+ // Send messages.
+ testProducer[0].Send(testChannel[0].CreateTextMessage("D"));
+ testChannel[0].Commit();
+
+ // Try to receive messages.
+ ConsumeNMessagesOnly(1, "D", testConsumer[1]);
+ testChannel[1].Commit();
+
+ // Try to receive messages.
+ ConsumeNMessagesOnly(0, "D", testConsumer[1]);
+ }
+
+ /// <summary> Check that a rolled back receive can be re-received. </summary>
+ [Test]
+ public void TestRolledBackReceiveCanBeRereceived()
+ {
+ // Send messages.
+ testProducer[0].Send(testChannel[0].CreateTextMessage("E"));
+ testChannel[0].Commit();
+
+ // Try to receive messages.
+ ConsumeNMessagesOnly(1, "E", testConsumer[1]);
+
+ testChannel[1].Rollback();
+
+ // Try to receive messages.
+ ConsumeNMessagesOnly(1, "E", testConsumer[1]);
+
+ }
+
+ [Test]
+ public void TestReceiveAndSendRollback()
+ {
+ // Send messages
+ testProducer[0].Send(testChannel[0].CreateTextMessage("F"));
+ testChannel[0].Commit();
+
+ // Try to receive messages.
+ ConsumeNMessagesOnly(1, "F", testConsumer[1]);
+ testProducer[1].Send(testChannel[1].CreateTextMessage("G"));
+ testChannel[1].Rollback();
+
+ // Try to receive messages.
+ ConsumeNMessagesOnly(1, "F", testConsumer[1]);
+
+ }
+
+ [Test]
+ public void TestReceivePrePublished()
+ {
+ // Send messages
+ for (int i = 0; i < 10; ++i)
+ {
+ testProducer[0].Send(testChannel[0].CreateTextMessage("G"+i));
+ testChannel[0].Commit();
+ }
+
+ for (int i = 0; i < 10; ++i)
+ {
+ ConsumeNMessages(1, "G"+i, testConsumer[1]);
+ }
+ testChannel[1].Commit();
+ }
+
+ [Test]
+ public void TestReceivePrePublishedOnMessageHandler()
+ {
+ testConsumer[1].OnMessage += new MessageReceivedDelegate(OnMessage);
+ // Send messages
+ for (int i = 0; i < 10; ++i)
+ {
+ testProducer[0].Send(testChannel[0].CreateTextMessage("G"+i));
+ testChannel[0].Commit();
+ }
+ expectedMessageCount = 10;
+
+ finishedEvent.WaitOne(new TimeSpan(0, 0, 0, 30), false);
+
+ // Check that all messages really were received.
+ Assert.IsTrue(allReceived, "All messages were not received, only got: " + messageReceivedCount + " but wanted " + expectedMessageCount);
+
+ testChannel[1].Commit();
+ }
+
+ /// <summary> Atomically increments the message count on every message, and signals once all messages in the test are received. </summary>
+ public void OnMessage(IMessage m)
+ {
+ int newCount = Interlocked.Increment(ref messageReceivedCount);
+
+ if (newCount >= expectedMessageCount)
+ {
+ allReceived = true;
+ finishedEvent.Set();
+ }
+ }
+
+ }
+}
diff --git a/qpid/dotnet/Qpid.Integration.Tests/testcases/ConnectionTest.cs b/qpid/dotnet/Qpid.Integration.Tests/testcases/ConnectionTest.cs
new file mode 100644
index 0000000000..d7b4a4ddd2
--- /dev/null
+++ b/qpid/dotnet/Qpid.Integration.Tests/testcases/ConnectionTest.cs
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using NUnit.Framework;
+using Apache.Qpid.Client.Qms;
+using Apache.Qpid.Client;
+using Apache.Qpid.Messaging;
+
+namespace Apache.Qpid.Integration.Tests.testcases
+{
+ [TestFixture, Category("Integration")]
+ public class ConnectionTest
+ {
+ private AmqBrokerInfo _broker =
+ new AmqBrokerInfo("amqp", "localhost", 5672, false);
+
+ [Test]
+ public void SimpleConnection()
+ {
+ IConnectionInfo connectionInfo = new QpidConnectionInfo();
+ connectionInfo.VirtualHost = "test";
+ connectionInfo.AddBrokerInfo(_broker);
+ using (IConnection connection = new AMQConnection(connectionInfo))
+ {
+ Console.WriteLine("connection = " + connection);
+ }
+ }
+
+ [Test]
+ [ExpectedException(typeof(AMQAuthenticationException))]
+ public void PasswordFailureConnection()
+ {
+ IConnectionInfo connectionInfo = new QpidConnectionInfo();
+ connectionInfo.VirtualHost = "test";
+ connectionInfo.Password = "rubbish";
+ connectionInfo.AddBrokerInfo(_broker);
+
+ using (IConnection connection = new AMQConnection(connectionInfo))
+ {
+ Console.WriteLine("connection = " + connection);
+ // wrong
+ Assert.Fail("Authentication succeeded but should've failed");
+ }
+ }
+
+ [Test]
+ [ExpectedException(typeof(AMQConnectionException))]
+ public void ConnectionFailure()
+ {
+ string url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5673?retries='0''";
+ new AMQConnection(QpidConnectionInfo.FromUrl(url));
+ Assert.Fail("Connection should not be established");
+ }
+ }
+}
diff --git a/qpid/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs b/qpid/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs
new file mode 100644
index 0000000000..b7973ae3f5
--- /dev/null
+++ b/qpid/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs
@@ -0,0 +1,166 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+using log4net;
+using NUnit.Framework;
+using Apache.Qpid.Messaging;
+using Apache.Qpid.Client.Qms;
+
+namespace Apache.Qpid.Integration.Tests.testcases
+{
+ /// <summary>
+ /// DurableSubscriptionTest checks that durable subscriptions work, by sending messages that can be picked up by
+ /// a subscription that is currently off-line, and checking that the subscriber gets all of its messages when it
+ /// does come on-line.
+ ///
+ /// <p/><table id="crc"><caption>CRC Card</caption>
+ /// <tr><th> Responsibilities <th> Collaborations
+ /// <tr><td>
+ /// </table>
+ /// </summary>
+ [TestFixture, Category("Integration")]
+ public class DurableSubscriptionTest : BaseMessagingTestFixture
+ {
+ /// <summary>Used for debugging purposes.</summary>
+ private static ILog log = LogManager.GetLogger(typeof(DurableSubscriptionTest));
+
+ /// <summary>Defines the name of the test topic to use with the tests.</summary>
+ public const string TEST_ROUTING_KEY = "durablesubtestkey";
+
+ [SetUp]
+ public override void Init()
+ {
+ base.Init();
+ }
+
+ [TearDown]
+ public override void Shutdown()
+ {
+ base.Shutdown();
+ }
+
+ [Test]
+ public void TestDurableSubscriptionNoAck()
+ {
+ TestDurableSubscription(AcknowledgeMode.NoAcknowledge);
+ }
+
+ [Test]
+ public void TestDurableSubscriptionAutoAck()
+ {
+ TestDurableSubscription(AcknowledgeMode.AutoAcknowledge);
+ }
+
+ private void TestDurableSubscription(AcknowledgeMode ackMode)
+ {
+ // Create a topic with one producer and two consumers.
+ SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, ackMode, false, ExchangeNameDefaults.TOPIC, true, false, null);
+ SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, ackMode, false, ExchangeNameDefaults.TOPIC, true, false, null);
+ SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, ackMode, false, ExchangeNameDefaults.TOPIC, true,
+ true, "TestSubscription" + testId);
+
+ Thread.Sleep(500);
+
+ // Send messages and receive on both consumers.
+ testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
+
+ ConsumeNMessagesOnly(1, "A", testConsumer[1]);
+ ConsumeNMessagesOnly(1, "A", testConsumer[2]);
+
+ // Detach one consumer.
+ CloseEndPoint(2);
+
+ // Send message and receive on one consumer.
+ testProducer[0].Send(testChannel[0].CreateTextMessage("B"));
+
+ ConsumeNMessagesOnly(1, "B", testConsumer[1]);
+
+ // Re-attach consumer, check that it gets the messages that it missed.
+ SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, ackMode, false, ExchangeNameDefaults.TOPIC, true,
+ true, "TestSubscription" + testId);
+
+ ConsumeNMessagesOnly(1, "B", testConsumer[2]);
+
+ // Clean up any open consumers at the end of the test.
+ CloseEndPoint(2);
+ CloseEndPoint(1);
+ CloseEndPoint(0);
+ }
+
+ /// <summary> Check that an uncommitted receive can be re-received, on re-consume from the same durable subscription. </summary>
+ [Test]
+ public void TestUncommittedReceiveCanBeRereceivedNewConnection()
+ {
+ SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC,
+ true, false, null);
+ SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC,
+ true, true, "foo"+testId);
+
+ // Send messages.
+ testProducer[0].Send(testChannel[0].CreateTextMessage("C"));
+ testChannel[0].Commit();
+
+ // Try to receive messages, but don't commit them.
+ ConsumeNMessagesOnly(1, "C", testConsumer[1]);
+
+ // Close end-point 1 without committing the message, then re-open the subscription to consume again.
+ CloseEndPoint(1);
+ SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC,
+ true, true, "foo"+testId);
+
+ // Check that the message was released from the rolled back end-point an can be received on the alternative one instead.
+ ConsumeNMessagesOnly(1, "C", testConsumer[1]);
+ testChannel[1].Commit();
+ CloseEndPoint(1);
+ CloseEndPoint(0);
+ }
+
+ /// <summary> Check that a rolled back receive can be re-received, on re-consume from the same durable subscription. </summary>
+ [Test]
+ public void TestRolledBackReceiveCanBeRereceivedNewConnection()
+ {
+ SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC,
+ true, false, null);
+ SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC,
+ true, true, "foo"+testId);
+
+ // Send messages.
+ testProducer[0].Send(testChannel[0].CreateTextMessage("D"));
+ testChannel[0].Commit();
+
+ // Try to receive messages, but roll them back.
+ ConsumeNMessagesOnly(1, "D", testConsumer[1]);
+ testChannel[1].Rollback();
+
+ // Close end-point 1 without committing the message, then re-open the subscription to consume again.
+ CloseEndPoint(1);
+ SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC,
+ true, true, "foo"+testId);
+
+ // Check that the message was released from the rolled back end-point an can be received on the alternative one instead.
+ ConsumeNMessagesOnly(1, "D", testConsumer[1]);
+ testChannel[1].Commit();
+ CloseEndPoint(1);
+ CloseEndPoint(0);
+ }
+ }
+}
diff --git a/qpid/dotnet/Qpid.Integration.Tests/testcases/HeadersExchangeTest.cs b/qpid/dotnet/Qpid.Integration.Tests/testcases/HeadersExchangeTest.cs
new file mode 100644
index 0000000000..2094aa3b1b
--- /dev/null
+++ b/qpid/dotnet/Qpid.Integration.Tests/testcases/HeadersExchangeTest.cs
@@ -0,0 +1,282 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+using log4net;
+using NUnit.Framework;
+using Apache.Qpid.Framing;
+using Apache.Qpid.Messaging;
+using Apache.Qpid.Client.Qms;
+using Apache.Qpid.Client;
+
+namespace Apache.Qpid.Integration.Tests.testcases
+{
+ /// <summary>
+ /// Sets up a producer/consumer pair to send test messages through a header exchange. The header exchange matching pattern is tested to
+ /// verify that it correctly matches or filters out messages based on their headers.
+ ///
+ /// Check that a message matching all fields of a headers exchange is passed by the exchange.
+ /// Check that a message containing values for empty fields of a headers exchange is passed by the exchange.
+ /// Check that a message matching only some fields of a headers exhcnage is not passed by the exchange.
+ /// Check that a message with additional fields to the correct matching fields of a headers exchange is passed by the exchange.
+ /// </summary>
+ ///
+ /// <todo>Remove the HeadersMatchingProducer class and rename this to HeaderExchangeTest. The producer and consumer are implemented
+ /// in a single test class to make running this as part of an automated test suite possible.</todo>
+ ///
+ /// <todo>Consider not using a delegate to callback the OnMessage method. Easier to just call receive on the consumer but using the
+ /// callback does demonstrate how to do so.</todo>
+ [TestFixture, Category("Integration")]
+ public class HeadersExchangeTest : BaseMessagingTestFixture
+ {
+ private static ILog _logger = LogManager.GetLogger(typeof(HeadersExchangeTest));
+
+ /// <summary> Holds the default test timeout for broker communications before tests give up. </summary>
+ private static readonly int TIMEOUT = 2000;
+
+ /// <summary> Holds the name of the headers exchange to create to send test messages on. </summary>
+ private string _exchangeName = "ServiceQ1";
+
+ /// <summary> Used to preserve the most recent exception in case test cases need to examine it. </summary>
+ private Exception _lastException = null;
+
+ /// <summary> Used to preserve the most recent message from the test consumer. </summary>
+ private IMessage _lastMessage = null;
+
+ /// <summary> The test consumer to get messages from the broker with. </summary>
+ private IMessageConsumer _consumer;
+
+ private IMessagePublisher _publisher;
+
+ private AutoResetEvent _evt = new AutoResetEvent(false);
+
+ private MessageReceivedDelegate _msgRecDelegate;
+ private ExceptionListenerDelegate _exceptionDelegate;
+
+ /// <summary> Holds the test connection. </summary>
+ protected IConnection _connection;
+
+ /// <summary> Holds the test channel. </summary>
+ protected IChannel _channel;
+
+ [SetUp]
+ public override void Init()
+ {
+ // Ensure that the base init method is called. It establishes a connection with the broker.
+ base.Init();
+
+ connectionInfo = QpidConnectionInfo.FromUrl(connectionUri);
+ _connection = new AMQConnection(connectionInfo);
+ _channel = _connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge, 500, 300);
+
+ _logger.Info("Starting...");
+ _logger.Info("Exchange name is '" + _exchangeName + "'...");
+
+ // Register this to listen for exceptions on the test connection.
+ _exceptionDelegate = new ExceptionListenerDelegate(OnException);
+ _connection.ExceptionListener += _exceptionDelegate;
+
+ // Declare a new headers exchange with the name of the test service.
+ _channel.DeclareExchange(_exchangeName, ExchangeClassConstants.HEADERS);
+
+ // Create a non-durable, temporary (aka auto-delete), exclusive queue.
+ string queueName = _channel.GenerateUniqueName();
+ _channel.DeclareQueue(queueName, false, true, true);
+
+ // Bind the queue to the new headers exchange, setting up some header patterns for the exchange to match.
+ _channel.Bind(queueName, _exchangeName, null, CreatePatternAsFieldTable());
+
+ // Create a test consumer to consume messages from the test exchange.
+ _consumer = _channel.CreateConsumerBuilder(queueName)
+ .WithPrefetchLow(100)
+ .WithPrefetchHigh(500)
+ .WithNoLocal(false) // make sure we get our own messages
+ .Create();
+
+ // Register this to listen for messages on the consumer.
+ _msgRecDelegate = new MessageReceivedDelegate(OnMessage);
+ _consumer.OnMessage += _msgRecDelegate;
+
+ // Clear the most recent message and exception.
+ _lastException = null;
+ _lastMessage = null;
+
+ _publisher = _channel.CreatePublisherBuilder()
+ .WithExchangeName(_exchangeName)
+ .WithMandatory(true)
+ .Create();
+
+ _publisher.DeliveryMode = DeliveryMode.NonPersistent;
+
+ // Start all channel
+ _connection.Start();
+ }
+
+ /// <summary>
+ /// Deregisters the on message delegate before closing the connection.
+ /// </summary>
+ [TearDown]
+ public override void Shutdown()
+ {
+ _logger.Info("public void Shutdown(): called");
+
+ //_consumer.OnMessage -= _msgRecDelegate;
+ //_connection.ExceptionListener -= _exceptionDelegate;
+
+ _connection.Stop();
+ _connection.Close();
+ _connection.Dispose();
+
+ base.Shutdown();
+ }
+
+ /// <summary>
+ /// Callback method that is passed any messages received on the test channel.
+ /// </summary>
+ ///
+ /// <param name="message">The received message.</param>
+ public void OnMessage(IMessage message)
+ {
+ _logger.Debug(string.Format("message.Type = {0}", message.GetType()));
+ _logger.Debug("Got message '" + message + "'");
+
+ // Preserve the most recent exception so that test cases can examine it.
+ _lastMessage = message;
+
+ // Notify any waiting threads that a message has been received.
+ _evt.Set();
+ }
+
+ /// <summary>Callback method to handle any exceptions raised by the test connection.</summary>
+ ///
+ /// <param name="e">The connection exception.</param>
+ public void OnException(Exception e)
+ {
+ // Preserve the most recent exception in case test cases need to examine it.
+ _lastException = e;
+
+ // Notify any waiting threads that an exception event has occurred.
+ _evt.Set();
+ }
+
+ /// <summary>Check that a message matching all fields of a headers exchange is passed by the exchange.</summary>
+ [Test]
+ public void TestMatchAll()
+ {
+ IMessage msg = _channel.CreateTextMessage("matches match2=''");
+ msg.Headers["match1"] = "foo";
+ msg.Headers["match2"] = "";
+
+ // Use the SendTestMessage helper method to verify that the message was sent and received.
+ SendTestMessage(msg, true);
+ }
+
+ /// <summary>Check that a message containing values for empty fields of a headers exchange is passed by the exchange.</summary>
+ [Test]
+ public void TestMatchEmptyMatchesAnything()
+ {
+ // Send a test message that matches the headers exchange.
+ IMessage msg = _channel.CreateTextMessage("matches match1='foo' and match2='bar'");
+ msg.Headers["match1"] = "foo";
+ msg.Headers["match2"] = "bar";
+
+ // Use the SendTestMessage helper method to verify that the message was sent and received.
+ SendTestMessage(msg, true);
+ }
+
+ /// <summary>Check that a message matching only some fields of a headers exchange is not passed by the exchange.</summary>
+ [Test]
+ public void TestMatchOneFails()
+ {
+ IMessage msg = _channel.CreateTextMessage("not match - only match1");
+ msg.Headers["match1"] = "foo";
+
+ // Use the SendTestMessage helper method to verify that the message was sent and not received.
+ SendTestMessage(msg, false);
+ }
+
+ /// <summary>
+ /// Check that a message with additional fields to the correct matching fields of a headers exchange is passed by
+ /// the exchange.
+ /// </summary>
+ [Test]
+ public void TestMatchExtraFields()
+ {
+ IMessage msg = _channel.CreateTextMessage("matches - extra headers");
+ msg.Headers["match1"] = "foo";
+ msg.Headers["match2"] = "bar";
+ msg.Headers["match3"] = "not required";
+
+ // Use the SendTestMessage helper method to verify that the message was sent and received.
+ SendTestMessage(msg, true);
+ }
+
+ /// <summary>
+ /// Sends the specified message to the test publisher, and confirms that it was received by the test consumer or not
+ /// depending on whether or not the message should be received by the consumer.
+ ///
+ /// Any exceptions raised by the connection will cause an Assert failure exception to be raised.
+ /// </summary>
+ ///
+ /// <param name="msgSend">The message to send.</param>
+ /// <param name="shouldPass">A flag to indicate whether or not the message should be received by the consumer.</param>
+ private void SendTestMessage(IMessage msgSend, bool shouldPass)
+ {
+ _publisher.Send(msgSend);
+ _evt.WaitOne(TIMEOUT, true);
+
+ // Check that an exception other than not routable was raised in which case re-raise it as a test error.
+ if (_lastException != null && !(_lastException.InnerException is AMQUndeliveredException))
+ {
+ Assert.Fail("Exception {0} was raised by the broker connection.", _lastException);
+ }
+ // Check that a message was returned if the test is expecting the message to pass.
+ else if (shouldPass)
+ {
+ Assert.IsNotNull(_lastMessage, "Did not get a matching message from the headers exchange.");
+ }
+ // Check that a not routable exception was raised if the test is expecting the message to fail.
+ else if (_lastException != null && _lastException.InnerException is AMQUndeliveredException)
+ {
+ Assert.IsNull(_lastMessage, "Message could not be routed so consumer should not have received it.");
+ }
+ // The broker did not respond within the test timeout so fail the test.
+ else
+ {
+ Assert.Fail("The test timed out without a response from the broker.");
+ }
+ }
+
+ /// <summary> Returns a field table containing patterns to match the test header exchange against. </summary>
+ ///
+ /// <returns> A field table containing test patterns. </returns>
+ private FieldTable CreatePatternAsFieldTable()
+ {
+ FieldTable matchTable = new FieldTable();
+
+ matchTable["match1"] = "foo";
+ matchTable["match2"] = "";
+ matchTable["x-match"] = "all";
+
+ return matchTable;
+ }
+ }
+}
diff --git a/qpid/dotnet/Qpid.Integration.Tests/testcases/MandatoryMessageTest.cs b/qpid/dotnet/Qpid.Integration.Tests/testcases/MandatoryMessageTest.cs
new file mode 100644
index 0000000000..4abc56905f
--- /dev/null
+++ b/qpid/dotnet/Qpid.Integration.Tests/testcases/MandatoryMessageTest.cs
@@ -0,0 +1,149 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+using log4net;
+using NUnit.Framework;
+using Apache.Qpid.Messaging;
+using Apache.Qpid.Client.Qms;
+using Apache.Qpid.Client;
+
+namespace Apache.Qpid.Integration.Tests.testcases
+{
+ /// <summary>
+ /// MandatoryMessageTest checks that messages sent with the 'mandatory' flag, must either be routed to a valid
+ /// queue or returned to the sender when no route is available.
+ ///
+ /// <p><table id="crc"><caption>CRC Card</caption>
+ /// <tr><th> Responsibilities <th> Collaborations
+ /// <tr><td> Check default exchange returns unroutable mandatory messages.
+ /// <tr><td> Check direct exchange returns unroutable mandatory messages.
+ /// <tr><td> Check headers exchange returns unroutable mandatory messages.
+ /// <tr><td> Check topic exchange returns unroutable mandatory messages.
+ /// </table>
+ /// </summary>
+ [TestFixture, Category("Integration")]
+ public class MandatoryMessageTest : BaseMessagingTestFixture
+ {
+ /// <summary>Used for debugging purposes.</summary>
+ private static ILog log = LogManager.GetLogger(typeof(MandatoryMessageTest));
+
+ /// <summary>Defines the maximum time in milliseconds, to wait for redelivery to occurr.</summary>
+ public const int TIMEOUT = 1000;
+
+ /// <summary>Defines the name of the routing key to use with the tests.</summary>
+ public const string TEST_ROUTING_KEY = "unboundkey";
+
+ /// <summary>Condition used to coordinate receipt of redelivery exception to the sending thread.</summary>
+ private ManualResetEvent errorEvent;
+
+ /// <summary>Holds the last received error condition, for examination by the tests sending thread.</summary>
+ private Exception lastErrorException;
+
+ /// <summary> Holds the test connection. </summary>
+ protected IConnection _connection;
+
+ /// <summary> Holds the test channel. </summary>
+ protected IChannel _channel;
+
+ [SetUp]
+ public override void Init()
+ {
+ base.Init();
+
+ errorEvent = new ManualResetEvent(false);
+ lastErrorException = null;
+ }
+
+ [TearDown]
+ public override void Shutdown()
+ {
+ base.Shutdown();
+ }
+
+ /// <summary>
+ /// Handles all exception conditions on the connection. The error event is notified and the exception recorded as the last seen.
+ /// </summary>
+ ///
+ /// <param name="e">The asynchronous exception on the connection.</param>
+ public void OnException(Exception e)
+ {
+ lastErrorException = e;
+ errorEvent.Set();
+ }
+
+ [Test]
+ public void SendUndeliverableMessageOnDirectExchange()
+ {
+ SendOne(ExchangeNameDefaults.DIRECT);
+ }
+
+ [Test]
+ public void SendUndeliverableMessageOnTopicExchange()
+ {
+ SendOne(ExchangeNameDefaults.TOPIC);
+ }
+
+ [Test]
+ public void SendUndeliverableMessageOnHeadersExchange()
+ {
+ SendOne(ExchangeNameDefaults.HEADERS);
+ }
+
+ /// <summary>
+ /// Sends a single message to the specified exchange with the routing key 'unboundkey', marked as mandatory.
+ /// A check is performed to assert that a redelivery error is returned from the broker for the message.
+ /// </summary>
+ ///
+ /// <param name="exchangeName">The name of the exchange to send to.</param>
+ private void SendOne(string exchangeName)
+ {
+ log.Debug("private void SendOne(string exchangeName = " + exchangeName + "): called");
+
+ // Send a test message to a unbound key on the specified exchange.
+ SetUpEndPoint(0, false, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, false, exchangeName,
+ true, false, null);
+ testProducer[0] = testChannel[0].CreatePublisherBuilder()
+ .WithRoutingKey(TEST_ROUTING_KEY + testId)
+ .WithMandatory(true)
+ .WithExchangeName(exchangeName)
+ .Create();
+
+ // Set up the exception listener on the connection.
+ testConnection[0].ExceptionListener = new ExceptionListenerDelegate(OnException);
+
+ // Send message that should fail.
+ testProducer[0].Send(testChannel[0].CreateTextMessage("Test Message"));
+
+ // Wait for up to the timeout for a redelivery exception to be returned.
+ errorEvent.WaitOne(TIMEOUT, true);
+
+ // Asserts that a redelivery exception was returned, and is of the correct type.
+ Type expectedException = typeof(AMQUndeliveredException);
+ Exception ex = lastErrorException;
+
+ Assert.IsNotNull(ex, "No exception was thrown by the test. Expected " + expectedException);
+ Assert.IsInstanceOfType(expectedException, ex.InnerException);
+
+ CloseEndPoint(0);
+ }
+ }
+}
diff --git a/qpid/dotnet/Qpid.Integration.Tests/testcases/ProducerMultiConsumerTest.cs b/qpid/dotnet/Qpid.Integration.Tests/testcases/ProducerMultiConsumerTest.cs
new file mode 100644
index 0000000000..bae6c76818
--- /dev/null
+++ b/qpid/dotnet/Qpid.Integration.Tests/testcases/ProducerMultiConsumerTest.cs
@@ -0,0 +1,167 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Text;
+using System.Threading;
+using log4net;
+using NUnit.Framework;
+using Apache.Qpid.Messaging;
+using Apache.Qpid.Client.Qms;
+using Apache.Qpid.Client;
+
+namespace Apache.Qpid.Integration.Tests.testcases
+{
+ /// ProducerMultiConsumerTest provides some tests for one producer and multiple consumers.
+ ///
+ /// <p><table id="crc"><caption>CRC Card</caption>
+ /// <tr><th> Responsibilities <th> Collaborations
+ /// <tr><td> Check that all consumers on a topic each receive all message on it.
+ /// <tr><td> Check that consumers on the same queue receive each message once accross all consumers.
+ /// </table>
+ /// </summary>
+ [TestFixture, Category("Integration")]
+ public class ProducerMultiConsumerTest : BaseMessagingTestFixture
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(ProducerMultiConsumerTest));
+
+ /// <summary>Base name for the routing key used for this test (made unique by adding in test id).</summary>
+ private const string TEST_ROUTING_KEY = "ProducerMultiConsumerTest";
+
+ /// <summary>The number of consumers to test.</summary>
+ private const int CONSUMER_COUNT = 5;
+
+ /// <summary>The number of test messages to send.</summary>
+ private const int MESSAGE_COUNT = 10;
+
+ /// <summary>Monitor used to signal succesfull receipt of all test messages.</summary>
+ AutoResetEvent _finishedEvent;
+
+ /// <summary>Used to count test messages received so far.</summary>
+ private int _messageReceivedCount;
+
+ /// <summary>Used to hold the expected number of messages to receive.</summary>
+ private int expectedMessageCount;
+
+ /// <summary>Flag used to indicate that all messages really were received, and that the test did not just time out. </summary>
+ private bool allReceived;
+
+ /// <summary> Creates one producing end-point and many consuming end-points connected on a topic. </summary>
+ [SetUp]
+ public override void Init()
+ {
+ base.Init();
+
+ // Reset all test counts and flags.
+ _messageReceivedCount = 0;
+ allReceived = false;
+ _finishedEvent = new AutoResetEvent(false);
+ }
+
+ /// <summary> Cleans up all test end-points. </summary>
+ [TearDown]
+ public override void Shutdown()
+ {
+ try
+ {
+ // Close all end points for producer and consumers.
+ // Producer is on 0, and consumers on 1 .. n, so loop is from 0 to n inclusive.
+ for (int i = 0; i <= CONSUMER_COUNT; i++)
+ {
+ CloseEndPoint(i);
+ }
+ }
+ finally
+ {
+ base.Shutdown();
+ }
+ }
+
+ /// <summary> Check that all consumers on a topic each receive all message on it. </summary>
+ [Test]
+ public void AllConsumerReceiveAllMessagesOnTopic()
+ {
+ // Create end-points for all the consumers in the test.
+ for (int i = 1; i <= CONSUMER_COUNT; i++)
+ {
+ SetUpEndPoint(i, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, false, ExchangeNameDefaults.TOPIC,
+ true, false, null);
+ testConsumer[i].OnMessage += new MessageReceivedDelegate(OnMessage);
+ }
+
+ // Create an end-point to publish to the test topic.
+ SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, false, ExchangeNameDefaults.TOPIC,
+ true, false, null);
+
+ expectedMessageCount = (MESSAGE_COUNT * CONSUMER_COUNT);
+
+ for (int i = 0; i < MESSAGE_COUNT; i++)
+ {
+ testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
+ }
+
+ _finishedEvent.WaitOne(new TimeSpan(0, 0, 0, 30), false);
+
+ // Check that all messages really were received.
+ Assert.IsTrue(allReceived, "All messages were not received, only got " + _messageReceivedCount + " but wanted " + expectedMessageCount);
+ }
+
+ /// <summary> Check that consumers on the same queue receive each message once accross all consumers. </summary>
+ [Test]
+ public void AllConsumerReceiveAllMessagesOnDirect()
+ {
+ // Create end-points for all the consumers in the test.
+ for (int i = 1; i <= CONSUMER_COUNT; i++)
+ {
+ SetUpEndPoint(i, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, false, ExchangeNameDefaults.DIRECT,
+ true, false, null);
+ testConsumer[i].OnMessage += new MessageReceivedDelegate(OnMessage);
+ }
+
+ // Create an end-point to publish to the test topic.
+ SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, false, ExchangeNameDefaults.DIRECT,
+ true, false, null);
+
+ expectedMessageCount = (MESSAGE_COUNT * CONSUMER_COUNT);
+
+ for (int i = 0; i < MESSAGE_COUNT; i++)
+ {
+ testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
+ }
+
+ _finishedEvent.WaitOne(new TimeSpan(0, 0, 0, 30), false);
+
+ // Check that all messages really were received.
+ Assert.IsTrue(allReceived, "All messages were not received, only got: " + _messageReceivedCount + " but wanted " + expectedMessageCount);
+ }
+
+ /// <summary> Atomically increments the message count on every message, and signals once all messages in the test are received. </summary>
+ public void OnMessage(IMessage m)
+ {
+ int newCount = Interlocked.Increment(ref _messageReceivedCount);
+
+ if (newCount >= expectedMessageCount)
+ {
+ allReceived = true;
+ _finishedEvent.Set();
+ }
+ }
+ }
+}
diff --git a/qpid/dotnet/Qpid.Integration.Tests/testcases/Qpid.Integration.Tests.csproj b/qpid/dotnet/Qpid.Integration.Tests/testcases/Qpid.Integration.Tests.csproj
new file mode 100755
index 0000000000..01ca2cc5bd
--- /dev/null
+++ b/qpid/dotnet/Qpid.Integration.Tests/testcases/Qpid.Integration.Tests.csproj
@@ -0,0 +1,64 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+<Project DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <ProjectGuid>{EFEB9E41-B66E-4674-85F7-18FAD056AD67}</ProjectGuid>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <OutputType>Exe</OutputType>
+ <RootNamespace>Qpid.Integration.Tests</RootNamespace>
+ <AssemblyName>Qpid.Integration.Tests</AssemblyName>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)' == 'Debug' ">
+ <OutputPath>bin\Debug\</OutputPath>
+ <DebugSymbols>True</DebugSymbols>
+ <DebugType>Full</DebugType>
+ <Optimize>False</Optimize>
+ <CheckForOverflowUnderflow>True</CheckForOverflowUnderflow>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)' == 'Release' ">
+ <OutputPath>bin\Release\</OutputPath>
+ <DebugSymbols>False</DebugSymbols>
+ <DebugType>None</DebugType>
+ <Optimize>True</Optimize>
+ <CheckForOverflowUnderflow>False</CheckForOverflowUnderflow>
+ <DefineConstants>TRACE</DefineConstants>
+ </PropertyGroup>
+ <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.Targets" />
+ <ItemGroup>
+ <Reference Include="System" />
+ <Reference Include="System.Xml" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="BaseMessagingTestFixture.cs" />
+ <Compile Include="ChannelQueueTest.cs" />
+ <Compile Include="CommitRollbackTest.cs" />
+ <Compile Include="ConnectionTest.cs" />
+ <Compile Include="DurableSubscriptionTest.cs" />
+ <Compile Include="HeadersExchangeTest.cs" />
+ <Compile Include="MandatoryMessageTest.cs" />
+ <Compile Include="ProducerMultiConsumerTest.cs" />
+ <Compile Include="SslConnectionTest.cs" />
+ <Compile Include="QueueBrowsingTest.cs" />
+ </ItemGroup>
+</Project>
diff --git a/qpid/dotnet/Qpid.Integration.Tests/testcases/QueueBrowsingTest.cs b/qpid/dotnet/Qpid.Integration.Tests/testcases/QueueBrowsingTest.cs
new file mode 100644
index 0000000000..536439a44b
--- /dev/null
+++ b/qpid/dotnet/Qpid.Integration.Tests/testcases/QueueBrowsingTest.cs
@@ -0,0 +1,121 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+using log4net;
+using NUnit.Framework;
+using Apache.Qpid.Messaging;
+using Apache.Qpid.Client.Qms;
+using Apache.Qpid.Framing;
+
+namespace Apache.Qpid.Integration.Tests.testcases
+{
+ [TestFixture, Category("Integration")]
+ public class QueueBrowsingTest : BaseMessagingTestFixture
+ {
+ /// <summary>Used for debugging purposes.</summary>
+ private static ILog log = LogManager.GetLogger(typeof(QueueBrowsingTest));
+
+ public const string TEST_ROUTING_KEY = "queuebrowsingkey";
+ public const string TEST_ROUTING_KEY2 = "lvqbrowsingkey";
+
+
+ [SetUp]
+ public override void Init()
+ {
+ base.Init();
+ }
+
+ [TearDown]
+ public override void Shutdown()
+ {
+ base.Shutdown();
+ }
+
+ [Test]
+ public void TestQueueBrowsing()
+ {
+ // Create a topic with one producer and two consumers.
+ SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, false, ExchangeNameDefaults.DIRECT, true, true, null, false, false);
+ SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.NoAcknowledge, false, ExchangeNameDefaults.DIRECT, true, true, TEST_ROUTING_KEY + testId, false, true);
+ SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.NoAcknowledge, false, ExchangeNameDefaults.DIRECT, true, true, TEST_ROUTING_KEY + testId, false, true);
+
+ Thread.Sleep(500);
+
+ // Send messages and receive on both consumers.
+ testProducer[0].Send(testChannel[0].CreateTextMessage("msg"));
+ testProducer[0].Send(testChannel[0].CreateTextMessage("msg"));
+ testProducer[0].Send(testChannel[0].CreateTextMessage("msg"));
+ testProducer[0].Send(testChannel[0].CreateTextMessage("msg"));
+ testProducer[0].Send(testChannel[0].CreateTextMessage("msg"));
+ testProducer[0].Send(testChannel[0].CreateTextMessage("msg"));
+
+ Thread.Sleep(2000);
+
+
+ ConsumeNMessagesOnly(6, "msg", testConsumer[1]);
+ ConsumeNMessagesOnly(6, "msg", testConsumer[2]);
+
+ // Clean up any open consumers at the end of the test.
+ CloseEndPoint(2);
+ CloseEndPoint(1);
+ CloseEndPoint(0);
+ }
+
+ [Test]
+ public void TestQueueBrowsingLVQ()
+ {
+ // Create a topic with one producer and two consumers.
+ SetUpEndPoint(0, true, false, TEST_ROUTING_KEY2 + testId, AcknowledgeMode.AutoAcknowledge, false, ExchangeNameDefaults.DIRECT, true, true, TEST_ROUTING_KEY2 + testId, false, false);
+ FieldTable args = new FieldTable();
+ args.SetBoolean("qpid.last_value_queue", true);
+ args.SetString("qpid.last_value_queue_key", "key");
+ testChannel[0].DeclareQueue(TEST_ROUTING_KEY2 + testId, true, false, false, args);
+ testChannel[0].Bind(TEST_ROUTING_KEY2 + testId, ExchangeNameDefaults.DIRECT, TEST_ROUTING_KEY2 + testId);
+ Thread.Sleep(500);
+
+
+ for (int i = 0; i < 12; i++)
+ {
+ ITextMessage msg = testChannel[0].CreateTextMessage("msg");
+ msg.Headers.SetInt("key", i%6);
+ testProducer[0].Send(msg);
+ }
+
+ Thread.Sleep(2000);
+
+ SetUpEndPoint(1, false, true, TEST_ROUTING_KEY2 + testId, AcknowledgeMode.NoAcknowledge, false, ExchangeNameDefaults.DIRECT, true, true, TEST_ROUTING_KEY2 + testId, false, true);
+ SetUpEndPoint(2, false, true, TEST_ROUTING_KEY2 + testId, AcknowledgeMode.NoAcknowledge, false, ExchangeNameDefaults.DIRECT, true, true, TEST_ROUTING_KEY2 + testId, false, true);
+
+ Thread.Sleep(500);
+
+
+ ConsumeNMessagesOnly(6, "msg", testConsumer[1]);
+ ConsumeNMessagesOnly(6, "msg", testConsumer[2]);
+
+ // Clean up any open consumers at the end of the test.
+ CloseEndPoint(2);
+ CloseEndPoint(1);
+ CloseEndPoint(0);
+ }
+
+ }
+}
diff --git a/qpid/dotnet/Qpid.Integration.Tests/testcases/SslConnectionTest.cs b/qpid/dotnet/Qpid.Integration.Tests/testcases/SslConnectionTest.cs
new file mode 100644
index 0000000000..5f953e1470
--- /dev/null
+++ b/qpid/dotnet/Qpid.Integration.Tests/testcases/SslConnectionTest.cs
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.IO;
+using System.Reflection;
+using System.Security.Cryptography.X509Certificates;
+using NUnit.Framework;
+using Apache.Qpid.Client.Qms;
+using Apache.Qpid.Client;
+using Apache.Qpid.Messaging;
+
+namespace Apache.Qpid.Integration.Tests.testcases
+{
+ /// <summary>
+ /// Test SSL/TLS connections to the broker
+ /// </summary>
+ [TestFixture, Category("Integration")]
+ public class SslConnectionTest
+ {
+ /// <summary>
+ /// Make a test TLS connection to the broker
+ /// without using client-certificates
+ /// </summary>
+ //[Test]
+ public void DoSslConnection()
+ {
+ // because for tests we don't usually trust the server certificate
+ // we need here to tell the client to ignore certificate validation errors
+ SslOptions sslConfig = new SslOptions(null, true);
+
+ MakeBrokerConnection(sslConfig);
+ }
+
+ private static void MakeBrokerConnection(SslOptions options)
+ {
+ IConnectionInfo connectionInfo = new QpidConnectionInfo();
+ connectionInfo.VirtualHost = "test";
+ connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 8672, options));
+
+ using ( IConnection connection = new AMQConnection(connectionInfo) )
+ {
+ Console.WriteLine("connection = " + connection);
+ }
+ }
+ }
+}
diff --git a/qpid/dotnet/Qpid.Integration.Tests/testcases/SustainedTest.cs b/qpid/dotnet/Qpid.Integration.Tests/testcases/SustainedTest.cs
new file mode 100644
index 0000000000..4074055eba
--- /dev/null
+++ b/qpid/dotnet/Qpid.Integration.Tests/testcases/SustainedTest.cs
@@ -0,0 +1,109 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.IO;
+using System.Reflection;
+using System.Threading;
+using NUnit.Framework;
+using Apache.Qpid.Client.Qms;
+using Apache.Qpid.Client;
+using Apache.Qpid.Messaging;
+using log4net;
+
+namespace Apache.Qpid.Integration.Tests.testcases
+{
+ /// <summary>
+ /// Runs through the range of ack modes for each test case, sending and recieving a large number of messages
+ /// </summary>
+ [TestFixture, Category("Integration")]
+ public class SustainedTest : BaseMessagingTestFixture
+ {
+ /// <summary>The number of test messages to send.</summary>
+ private const int MESSAGE_COUNT = 50;//00;
+
+ /// <summary>Base name for the routing key used for this test (made unique by adding in test id).</summary>
+ private const string TEST_ROUTING_KEY = "MessageOrderTest";
+
+ /// <summary>
+ /// The logger
+ /// </summary>
+ private static ILog _logger = LogManager.GetLogger(typeof(SustainedTest));
+
+ [Test]
+ public void MessageOrderTestAutoAck()
+ {
+ MessageOrderTest(AcknowledgeMode.AutoAcknowledge);
+ }
+
+ [Test]
+ public void MessageOrderTestNoAck()
+ {
+ MessageOrderTest(AcknowledgeMode.NoAcknowledge);
+ }
+
+ public void MessageOrderTest(AcknowledgeMode consumerMode)
+ {
+
+ // Consumer
+ SetUpEndPoint(1, false, true, TEST_ROUTING_KEY, consumerMode, false, ExchangeNameDefaults.DIRECT,
+ true, false, null);
+
+
+ Console.WriteLine("Starting producer thread");
+ Thread prodThread = new Thread(new ThreadStart(SendMessages));
+ prodThread.Start();
+
+ Thread.Sleep(2000);
+ Console.WriteLine("Starting consuming");
+ for (int i = 0; i < MESSAGE_COUNT; i++)
+ {
+ if ((i % 10) == 0)
+ {
+ Console.WriteLine("Consuming message "+i);
+ }
+ ConsumeNMessages(1, "Msg"+i, testConsumer[1]);
+ }
+ prodThread.Join();
+ CloseEndPoint(0);
+ CloseEndPoint(1);
+ }
+
+ private static void SendMessages()
+ {
+ AMQConnection conn = new AMQConnection(QpidConnectionInfo.FromUrl(BaseMessagingTestFixture.connectionUri));
+ conn.Start();
+ IChannel channel = conn.CreateChannel(false, AcknowledgeMode.AutoAcknowledge);
+ IMessagePublisher producer = channel.CreatePublisherBuilder().
+ WithExchangeName(ExchangeNameDefaults.DIRECT).
+ WithRoutingKey(TEST_ROUTING_KEY).
+ Create();
+
+ for (int i = 0; i < MESSAGE_COUNT ; i++)
+ {
+ if ((i % 10) == 0)
+ {
+ Console.WriteLine("Sending message "+i);
+ }
+ producer.Send(channel.CreateTextMessage("Msg" + i));
+ }
+ }
+ }
+}