summaryrefslogtreecommitdiff
path: root/qpid/wcf/test
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/wcf/test')
-rw-r--r--qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/AsyncTest.cs190
-rw-r--r--qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/BasicTransactionTest.cs173
-rw-r--r--qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelAbortCommitTest.cs113
-rw-r--r--qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelContextParameters.cs229
-rw-r--r--qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelEntity.cs72
-rw-r--r--qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelReceiver.cs280
-rw-r--r--qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelSender.cs138
-rw-r--r--qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/CustomAmqpBindingTest.cs77
-rw-r--r--qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/FunctionalTests.csproj121
-rw-r--r--qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IGenericObjectService.cs30
-rw-r--r--qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IInteropService.cs31
-rw-r--r--qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService1.cs33
-rw-r--r--qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService2.cs33
-rw-r--r--qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService3.cs33
-rw-r--r--qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedServiceUsingTSRAttribute.cs30
-rw-r--r--qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedServiceUsingTransactionScope.cs30
-rw-r--r--qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageBodyTest.cs134
-rw-r--r--qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageClient.cs144
-rw-r--r--qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageProperties.txt22
-rw-r--r--qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MessagePropertiesTest.cs131
-rw-r--r--qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageService.cs198
-rw-r--r--qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MultipleEndpointsSameQueueTest.cs83
-rw-r--r--qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/Properties/AssemblyInfo.cs55
-rwxr-xr-xqpid/wcf/test/Apache/Qpid/Test/Channel/Functional/RunTests.bat34
-rw-r--r--qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/Util.cs157
-rw-r--r--qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/RawBodyUtility.cs161
-rw-r--r--qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/WcfPerftest.cs783
-rw-r--r--qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/WcfPerftest.csproj83
28 files changed, 3598 insertions, 0 deletions
diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/AsyncTest.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/AsyncTest.cs
new file mode 100644
index 0000000000..23bed6c603
--- /dev/null
+++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/AsyncTest.cs
@@ -0,0 +1,190 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System;
+ using System.ServiceModel;
+ using System.ServiceModel.Channels;
+ using System.Threading;
+ using NUnit.Framework;
+
+ [TestFixture]
+ public class AsyncTest
+ {
+ private const int MessageCount = 20;
+ private const string Queue = "amqp:amq.direct?routingkey=routing_key";
+ private Uri endpoint = new Uri("amqp:message_queue");
+ private TimeSpan standardTimeout = TimeSpan.FromSeconds(10.0); // seconds
+
+ [Test]
+ public void NonTryReceives()
+ {
+ this.SendMessages(this.standardTimeout, this.standardTimeout);
+ this.ReceiveNonTryMessages(this.standardTimeout, this.standardTimeout);
+ }
+
+ [Test]
+ public void TryReceives()
+ {
+ this.SendMessages(this.standardTimeout, this.standardTimeout);
+ this.ReceiveTryMessages(this.standardTimeout, this.standardTimeout);
+ }
+
+ [Test]
+ public void SmallTimeout()
+ {
+ // This code is commented out due to a bug in asynchronous channel open.
+ ////IChannelListener parentListener;
+ ////try
+ ////{
+ //// this.RetrieveAsyncChannel(new Uri("amqp:fake_queue_do_not_create"), TimeSpan.FromMilliseconds(10.0), out parentListener);
+ //// parentListener.Close();
+ //// Assert.Fail("Accepting the channel did not time out.");
+ ////}
+ ////catch (TimeoutException)
+ ////{
+ //// // Intended exception.
+ ////}
+
+ try
+ {
+ this.ReceiveNonTryMessages(this.standardTimeout, TimeSpan.FromMilliseconds(10.0));
+ Assert.Fail("Receiving a message did not time out.");
+ }
+ catch (TimeoutException)
+ {
+ // Intended exception.
+ }
+ }
+
+ private void SendMessages(TimeSpan channelTimeout, TimeSpan messageSendTimeout)
+ {
+ ChannelFactory<IOutputChannel> channelFactory =
+ new ChannelFactory<IOutputChannel>(Util.GetBinding(), Queue);
+ IOutputChannel proxy = channelFactory.CreateChannel();
+ IAsyncResult[] resultArray = new IAsyncResult[MessageCount];
+
+ for (int i = 0; i < MessageCount; i++)
+ {
+ Message toSend = Message.CreateMessage(MessageVersion.Default, string.Empty, i);
+ resultArray[i] = proxy.BeginSend(toSend, messageSendTimeout, null, null);
+ }
+
+ for (int j = 0; j < MessageCount; j++)
+ {
+ proxy.EndSend(resultArray[j]);
+ }
+
+ IAsyncResult iocCloseResult = proxy.BeginClose(channelTimeout, null, null);
+ Thread.Sleep(TimeSpan.FromMilliseconds(50.0)); // Dummy work
+ proxy.EndClose(iocCloseResult);
+
+ IAsyncResult chanFactCloseResult = channelFactory.BeginClose(channelTimeout, null, null);
+ Thread.Sleep(TimeSpan.FromMilliseconds(50.0)); // Dummy work
+ channelFactory.EndClose(chanFactCloseResult);
+ }
+
+ private void ReceiveNonTryMessages(TimeSpan channelTimeout, TimeSpan messageTimeout)
+ {
+ IChannelListener inputChannelParentListener;
+ IInputChannel inputChannel = this.RetrieveAsyncChannel(this.endpoint, channelTimeout, out inputChannelParentListener);
+
+ inputChannel.Open();
+
+ IAsyncResult[] resultArray = new IAsyncResult[MessageCount];
+ try
+ {
+ for (int i = 0; i < MessageCount; i++)
+ {
+ resultArray[i] = inputChannel.BeginReceive(messageTimeout, null, null);
+ }
+
+ for (int j = 0; j < MessageCount; j++)
+ {
+ inputChannel.EndReceive(resultArray[j]);
+ }
+ }
+ finally
+ {
+ IAsyncResult channelCloseResult = inputChannel.BeginClose(channelTimeout, null, null);
+ Thread.Sleep(TimeSpan.FromMilliseconds(50.0)); // Dummy work
+ inputChannel.EndClose(channelCloseResult);
+
+ // Asynchronous listener close has not been implemented.
+ ////IAsyncResult listenerCloseResult = inputChannelParentListener.BeginClose(channelTimeout, null, null);
+ ////Thread.Sleep(TimeSpan.FromMilliseconds(50.0)); // Dummy work
+ ////inputChannelParentListener.EndClose(listenerCloseResult);
+
+ inputChannelParentListener.Close();
+ }
+ }
+
+ private void ReceiveTryMessages(TimeSpan channelAcceptTimeout, TimeSpan messageReceiveTimeout)
+ {
+ IChannelListener<IInputChannel> listener = Util.GetBinding().BuildChannelListener<IInputChannel>(this.endpoint, new BindingParameterCollection());
+ listener.Open();
+ IInputChannel inputChannel = listener.AcceptChannel(channelAcceptTimeout);
+ IAsyncResult channelResult = inputChannel.BeginOpen(channelAcceptTimeout, null, null);
+ Thread.Sleep(TimeSpan.FromMilliseconds(50.0));
+ inputChannel.EndOpen(channelResult);
+
+ IAsyncResult[] resultArray = new IAsyncResult[MessageCount];
+
+ for (int i = 0; i < MessageCount; i++)
+ {
+ resultArray[i] = inputChannel.BeginTryReceive(messageReceiveTimeout, null, null);
+ }
+
+ for (int j = 0; j < MessageCount; j++)
+ {
+ Message tempMessage;
+ Assert.True(inputChannel.EndTryReceive(resultArray[j], out tempMessage), "Did not successfully receive message #{0}", j);
+ }
+
+ inputChannel.Close();
+ listener.Close();
+ }
+
+ private IInputChannel RetrieveAsyncChannel(Uri queue, TimeSpan timeout, out IChannelListener parentListener)
+ {
+ IChannelListener<IInputChannel> listener =
+ Util.GetBinding().BuildChannelListener<IInputChannel>(queue, new BindingParameterCollection());
+ listener.Open();
+ IInputChannel inputChannel;
+
+ try
+ {
+ IAsyncResult acceptResult = listener.BeginAcceptChannel(timeout, null, null);
+ Thread.Sleep(TimeSpan.FromMilliseconds(300.0)); // Dummy work
+ inputChannel = listener.EndAcceptChannel(acceptResult);
+ }
+ catch (TimeoutException)
+ {
+ listener.Close();
+ throw;
+ }
+ finally
+ {
+ parentListener = listener;
+ }
+ return inputChannel;
+ }
+ }
+}
diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/BasicTransactionTest.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/BasicTransactionTest.cs
new file mode 100644
index 0000000000..fa3b79d3a7
--- /dev/null
+++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/BasicTransactionTest.cs
@@ -0,0 +1,173 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Reflection;
+ using System.ServiceModel;
+ using System.Threading;
+ using NUnit.Framework;
+
+ [TestFixture]
+ public class BasicTransactionTest
+ {
+ private const string SendToUri = "amqp:amq.direct?routingkey=routing_key";
+ private const string ListenUri = "amqp:message_queue";
+
+ private MessageClient client;
+
+ [SetUp]
+ public void Setup()
+ {
+ // Create client
+ this.client = new MessageClient();
+ this.client.NumberOfMessages = 3;
+ this.client.NumberOfIterations = 1;
+
+ // Setup service
+ MessageService.EndpointAddress = ListenUri;
+ MessageService.ContractTypes = new List<Type>();
+ MessageService.CompletionHandle = new EventWaitHandle(false, EventResetMode.AutoReset);
+ }
+
+ [TestCase(true)]
+ [TestCase(false)]
+ public void TransactionalSend(bool commitTransaction)
+ {
+ int expectedMessageCount = 0;
+ this.client.TransactedSend = true;
+
+ MessageService.ContractTypes.Add(typeof(IQueuedServiceUsingTSRAttribute));
+ this.client.CommitTransaction = commitTransaction;
+
+ if (commitTransaction)
+ {
+ expectedMessageCount = this.client.NumberOfIterations * this.client.NumberOfMessages * MessageService.ContractTypes.Count;
+ }
+
+ // Call Service methods.
+ this.SendMessages(String.Empty);
+
+ // Validate results.
+ int actualMessageCount = Util.GetMessageCountFromQueue(ListenUri);
+ Assert.AreEqual(expectedMessageCount, actualMessageCount, "The actual message count wasn't as expected.");
+ }
+
+ [TestCase("UseTransactionScope", true)]
+ [TestCase("UseTransactionScope", false)]
+ [TestCase("UseTSRAttribute", true)]
+ [TestCase("UseTSRAttribute", false)]
+ public void TransactionalReceive(string testVariation, bool commitTransaction)
+ {
+ bool testPassed = true;
+ int expectedMessageCount = 0;
+ this.client.TransactedSend = false;
+ string transactionOutcome = "Commit";
+
+ switch (testVariation.Trim().ToLower())
+ {
+ case "usetransactionscope":
+ {
+ MessageService.ContractTypes.Add(typeof(IQueuedServiceUsingTransactionScope));
+ }
+
+ break;
+ case "usetsrattribute":
+ {
+ MessageService.ContractTypes.Add(typeof(IQueuedServiceUsingTSRAttribute));
+ }
+
+ break;
+ }
+
+ int expectedMethodCallCount = this.client.NumberOfIterations * this.client.NumberOfMessages * MessageService.ContractTypes.Count;
+
+ if (!commitTransaction)
+ {
+ expectedMessageCount = expectedMethodCallCount;
+ transactionOutcome = "Abort";
+ }
+
+ MessageService.IntendedInvocationCount = expectedMethodCallCount;
+
+ // Start the service.
+ MessageService.StartService(Util.GetBinding());
+
+ // Call Service methods.
+ this.SendMessages(transactionOutcome);
+
+ // Allow the wcf service to process all the messages before validation.
+ if (!MessageService.CompletionHandle.WaitOne(TimeSpan.FromSeconds(10.0), false))
+ {
+ Console.WriteLine("The service did not finish processing messages in 10 seconds. Test will be FAILED");
+ testPassed = false;
+ }
+
+ MessageService.StopService();
+
+ // Validate results.
+ if (expectedMethodCallCount > MessageService.TotalMethodCallCount)
+ {
+ Console.WriteLine("The expected method call count was {0} but got {1}.", expectedMethodCallCount, MessageService.TotalMethodCallCount);
+ testPassed = false;
+ }
+
+ int actualMessageCount = Util.GetMessageCountFromQueue(ListenUri);
+ if (expectedMessageCount != actualMessageCount)
+ {
+ Console.WriteLine("The expected message count was {0} but got {1}.", expectedMessageCount, actualMessageCount);
+ testPassed = false;
+ }
+
+ Assert.AreEqual(true, testPassed, "Results not as expected. Testcase FAILED.");
+
+ }
+
+ [TearDown]
+ public void Cleanup()
+ {
+ if (MessageService.IsServiceRunning())
+ {
+ MessageService.StopService();
+ }
+ }
+
+ private void SendMessages(string messageString)
+ {
+ // Create messages to send.
+ string[] messages = new string[this.client.NumberOfMessages];
+ for (int i = 0; i < this.client.NumberOfMessages; ++i)
+ {
+ messages[i] = messageString + " Message " + i;
+ }
+
+ // Create Amqpchannel and send messages.
+ MethodInfo runClientMethod = this.client.GetType().GetMethod("RunTestClient");
+ EndpointAddress address = new EndpointAddress(SendToUri);
+
+ foreach (Type contractType in MessageService.ContractTypes)
+ {
+ MethodInfo runClientT = runClientMethod.MakeGenericMethod(contractType);
+ runClientT.Invoke(this.client, new object[] { address, messages });
+ }
+ }
+ }
+}
diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelAbortCommitTest.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelAbortCommitTest.cs
new file mode 100644
index 0000000000..9c9a6c095e
--- /dev/null
+++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelAbortCommitTest.cs
@@ -0,0 +1,113 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System.Collections.Generic;
+ using System.ServiceModel.Channels;
+ using NUnit.Framework;
+
+ [TestFixture(1, true, false, true, true)]
+ [TestFixture(1, true, false, true, false)]
+ [TestFixture(1, true, false, false, true)]
+ [TestFixture(1, true, false, false, false)]
+ [TestFixture(1, false, true, true, true)]
+ [TestFixture(1, false, true, true, false)]
+ [TestFixture(1, false, true, false, true)]
+ [TestFixture(1, false, true, false, false)]
+ [TestFixture(5, true, false, true, true)]
+ [TestFixture(5, true, false, true, false)]
+ [TestFixture(5, true, false, false, true)]
+ [TestFixture(5, true, false, false, false)]
+ [TestFixture(5, false, true, true, true)]
+ [TestFixture(5, false, true, true, false)]
+ [TestFixture(5, false, true, false, true)]
+ [TestFixture(5, false, true, false, false)]
+ public class ChannelAbortCommitTest
+ {
+ private const string SendToUri = "amqp:amq.direct?routingkey=routing_key";
+ private const string ListenUri = "amqp:message_queue";
+
+ private ChannelContextParameters contextParameters;
+ private Binding channelBinding;
+ private List<string> expectedResults;
+
+ public ChannelAbortCommitTest(int numberOfThreads, bool sendAbort, bool receiveAbort, bool asyncSend, bool asyncReceive)
+ {
+ this.contextParameters = new ChannelContextParameters();
+ this.contextParameters.NumberOfThreads = numberOfThreads;
+ this.contextParameters.SenderShouldAbort = sendAbort;
+ this.contextParameters.ReceiverShouldAbort = receiveAbort;
+ this.contextParameters.AsyncSend = asyncSend;
+ this.contextParameters.AsyncReceive = asyncReceive;
+ }
+
+ [SetUp]
+ public void Setup()
+ {
+ this.channelBinding = Util.GetBinding();
+ this.GenerateExpectedResults();
+ }
+
+ [Test]
+ public void Run()
+ {
+ ChannelReceiver receiver = new ChannelReceiver(this.contextParameters, this.channelBinding);
+ ChannelSender sender = new ChannelSender(this.contextParameters, this.channelBinding);
+
+ sender.Run(SendToUri);
+ receiver.Run(ListenUri);
+
+ // Validate results.
+ bool comparisonOutcome = Util.CompareResults(this.expectedResults, receiver.Results);
+ Assert.AreEqual(true, comparisonOutcome, "The actual results were not as expected");
+ Assert.AreEqual(0, Util.GetMessageCountFromQueue(ListenUri), "The actual message count wasn't as expected.");
+ }
+
+ [TearDown]
+ public void Cleanup()
+ {
+ Util.PurgeQueue(ListenUri);
+ }
+
+ private void GenerateExpectedResults()
+ {
+ this.expectedResults = new List<string>();
+
+ if (this.contextParameters.NumberOfThreads == 1)
+ {
+ this.expectedResults.Add("Received message with Action 'FirstMessage'");
+ this.expectedResults.Add("Received message with Action 'Message 1'");
+ this.expectedResults.Add("Received message with Action 'Message 2'");
+ this.expectedResults.Add("Received message with Action 'Message 3'");
+ this.expectedResults.Add("Received message with Action 'Message 4'");
+ this.expectedResults.Add("Received message with Action 'Message 5'");
+ }
+ else
+ {
+ this.expectedResults.Add("Received message with Action 'FirstMessage'");
+ this.expectedResults.Add("Received message with Action 'Message'");
+ this.expectedResults.Add("Received message with Action 'Message'");
+ this.expectedResults.Add("Received message with Action 'Message'");
+ this.expectedResults.Add("Received message with Action 'Message'");
+ this.expectedResults.Add("Received message with Action 'Message'");
+ }
+ }
+ }
+}
diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelContextParameters.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelContextParameters.cs
new file mode 100644
index 0000000000..35e32ce25a
--- /dev/null
+++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelContextParameters.cs
@@ -0,0 +1,229 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System;
+
+ public class ChannelContextParameters
+ {
+ public ChannelContextParameters()
+ {
+ this.NumberOfMessages = 5;
+ this.NumberOfThreads = 1;
+ this.ReceiveTimeout = TimeSpan.FromSeconds(10.0);
+ this.WaitForSender = true;
+ this.UseAcceptChannelTimeout = true;
+ this.CreateChannel = true;
+ this.DoneSendingTimeout = TimeSpan.FromSeconds(10);
+ this.TransactionScopeTimeout = TimeSpan.FromMinutes(1);
+ this.AcceptChannelTimeout = TimeSpan.FromSeconds(10);
+ this.OpenTimeout = TimeSpan.FromSeconds(10);
+ this.ClientCommitDelay = TimeSpan.Zero;
+ this.WaitForChannelTimeout = TimeSpan.FromSeconds(5);
+ this.WaitForMessageTimeout = TimeSpan.FromSeconds(5);
+ }
+
+ public int NumberOfMessages
+ {
+ get;
+ set;
+ }
+
+ public int NumberOfThreads
+ {
+ get;
+ set;
+ }
+
+ public TimeSpan ReceiveTimeout
+ {
+ get;
+ set;
+ }
+
+ public bool SenderShouldAbort
+ {
+ get;
+ set;
+ }
+
+ public bool ReceiverShouldAbort
+ {
+ get;
+ set;
+ }
+
+ public bool AsyncSend
+ {
+ get;
+ set;
+ }
+
+ public bool AsyncReceive
+ {
+ get;
+ set;
+ }
+
+ public bool SendWithoutTransaction
+ {
+ get;
+ set;
+ }
+
+ public bool ReceiveWithoutTransaction
+ {
+ get;
+ set;
+ }
+
+ public bool SendWithMultipleTransactions
+ {
+ get;
+ set;
+ }
+
+ public bool ReceiveWithMultipleTransactions
+ {
+ get;
+ set;
+ }
+
+ public bool CloseBeforeReceivingAll
+ {
+ get;
+ set;
+ }
+
+ public bool WaitForSender
+ {
+ get;
+ set;
+ }
+
+ public TimeSpan DoneSendingTimeout
+ {
+ get;
+ set;
+ }
+
+ public TimeSpan TransactionScopeTimeout
+ {
+ get;
+ set;
+ }
+
+ public TimeSpan AcceptChannelTimeout
+ {
+ get;
+ set;
+ }
+
+ public TimeSpan OpenTimeout
+ {
+ get;
+ set;
+ }
+
+ public bool UseAcceptChannelTimeout
+ {
+ get;
+ set;
+ }
+
+ public bool CreateChannel
+ {
+ get;
+ set;
+ }
+
+ public TimeSpan ClientCommitDelay
+ {
+ get;
+ set;
+ }
+
+ public bool AsyncAccept
+ {
+ get;
+ set;
+ }
+
+ public bool CloseListenerEarly
+ {
+ get;
+ set;
+ }
+
+ public bool AbortTxDatagramAccept
+ {
+ get;
+ set;
+ }
+
+ public bool WaitForChannel
+ {
+ get;
+ set;
+ }
+
+ public TimeSpan WaitForChannelTimeout
+ {
+ get;
+ set;
+ }
+
+ public bool AsyncWaitForChannel
+ {
+ get;
+ set;
+ }
+
+ public bool WaitForMessage
+ {
+ get;
+ set;
+ }
+
+ public TimeSpan WaitForMessageTimeout
+ {
+ get;
+ set;
+ }
+
+ public bool AsyncWaitForMessage
+ {
+ get;
+ set;
+ }
+
+ public bool TryReceive
+ {
+ get;
+ set;
+ }
+
+ public bool TryReceiveNullIAsyncResult
+ {
+ get;
+ set;
+ }
+ }
+}
diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelEntity.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelEntity.cs
new file mode 100644
index 0000000000..9cabae3201
--- /dev/null
+++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelEntity.cs
@@ -0,0 +1,72 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System;
+ using System.Collections.Generic;
+ using System.ServiceModel.Channels;
+
+ public abstract class ChannelEntity
+ {
+ public ChannelEntity(ChannelContextParameters contextParameters, Binding channelBinding)
+ {
+ this.Parameters = contextParameters;
+ this.Binding = channelBinding;
+ this.Results = new List<string>();
+ }
+
+ protected ChannelContextParameters Parameters
+ {
+ get;
+ set;
+ }
+
+ protected Binding Binding
+ {
+ get;
+ set;
+ }
+
+ public List<string> Results
+ {
+ get;
+ set;
+ }
+
+ public abstract void Run(string serviceUri);
+
+ protected void WaitForChannel(IChannelListener listener, bool async, TimeSpan timeout)
+ {
+ bool ret = false;
+
+ if (async)
+ {
+ IAsyncResult result = listener.BeginWaitForChannel(timeout, null, null);
+ ret = listener.EndWaitForChannel(result);
+ }
+ else
+ {
+ ret = listener.WaitForChannel(timeout);
+ }
+
+ this.Results.Add(String.Format("WaitForChannel returned {0}", ret));
+ }
+ }
+}
diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelReceiver.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelReceiver.cs
new file mode 100644
index 0000000000..20af98fa64
--- /dev/null
+++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelReceiver.cs
@@ -0,0 +1,280 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System;
+ using System.ServiceModel;
+ using System.ServiceModel.Channels;
+ using System.Threading;
+ using System.Transactions;
+
+ public class ChannelReceiver : ChannelEntity
+ {
+ public ChannelReceiver(ChannelContextParameters contextParameters, Binding channelBinding)
+ : base(contextParameters, channelBinding)
+ {
+ }
+
+ public override void Run(string listenUri)
+ {
+ IChannelListener<IInputChannel> listener = this.Binding.BuildChannelListener<IInputChannel>(new Uri(listenUri));
+ listener.Open();
+
+ if (this.Parameters.WaitForChannel)
+ {
+ this.WaitForChannel(listener, this.Parameters.AsyncWaitForChannel, this.Parameters.WaitForChannelTimeout);
+ }
+
+ this.AcceptChannelAndReceive(listener);
+
+ if (listener.State != CommunicationState.Closed)
+ {
+ listener.Close();
+ }
+ }
+
+ private void AcceptChannelAndReceive(IChannelListener<IInputChannel> listener)
+ {
+ IInputChannel channel;
+ TransactionScope transactionToAbortOnAccept = null;
+
+ if (this.Parameters.AbortTxDatagramAccept)
+ {
+ transactionToAbortOnAccept = new TransactionScope(TransactionScopeOption.RequiresNew);
+ }
+
+ if (this.Parameters.AsyncAccept)
+ {
+ IAsyncResult result = listener.BeginAcceptChannel(null, null);
+ channel = listener.EndAcceptChannel(result);
+ }
+ else
+ {
+ channel = listener.AcceptChannel();
+ }
+
+ if (this.Parameters.AbortTxDatagramAccept)
+ {
+ transactionToAbortOnAccept.Dispose();
+ }
+
+ channel.Open();
+ Message message;
+
+ if (this.Parameters.CloseListenerEarly)
+ {
+ listener.Close();
+ }
+
+ try
+ {
+ using (TransactionScope ts = new TransactionScope(TransactionScopeOption.RequiresNew))
+ {
+ Message firstMessage = channel.Receive(this.Parameters.ReceiveTimeout);
+
+ lock (this.Results)
+ {
+ this.Results.Add(String.Format("Received message with Action '{0}'", firstMessage.Headers.Action));
+ }
+
+ ts.Complete();
+ }
+ }
+ catch (TimeoutException)
+ {
+ lock (this.Results)
+ {
+ this.Results.Add("Receive timed out.");
+ }
+
+ channel.Abort();
+ return;
+ }
+
+ AutoResetEvent doneReceiving = new AutoResetEvent(false);
+ int threadsCompleted = 0;
+
+ for (int i = 0; i < this.Parameters.NumberOfThreads; ++i)
+ {
+ ThreadPool.QueueUserWorkItem(new WaitCallback(delegate(object unused)
+ {
+ do
+ {
+ if (this.Parameters.ReceiverShouldAbort)
+ {
+ this.ReceiveMessage(channel, false);
+ Thread.Sleep(200);
+ }
+
+ message = this.ReceiveMessage(channel, true);
+ }
+ while (message != null);
+
+ if (Interlocked.Increment(ref threadsCompleted) == this.Parameters.NumberOfThreads)
+ {
+ doneReceiving.Set();
+ }
+ }));
+ }
+
+ TimeSpan threadTimeout = TimeSpan.FromMinutes(2.0);
+ if (!doneReceiving.WaitOne(threadTimeout, false))
+ {
+ this.Results.Add(String.Format("Threads did not complete within {0}.", threadTimeout));
+ }
+
+ channel.Close();
+ }
+
+ private Message ReceiveMessage(IInputChannel channel, bool commit)
+ {
+ Message message = null;
+
+ using (TransactionScope ts = new TransactionScope(TransactionScopeOption.Required))
+ {
+ bool messageDetected = false;
+ if (this.Parameters.AsyncWaitForMessage)
+ {
+ IAsyncResult result = channel.BeginWaitForMessage(this.Parameters.WaitForMessageTimeout, null, null);
+ messageDetected = channel.EndWaitForMessage(result);
+ }
+ else
+ {
+ messageDetected = channel.WaitForMessage(this.Parameters.WaitForMessageTimeout);
+ }
+
+ if (this.Parameters.WaitForMessage)
+ {
+ lock (this.Results)
+ {
+ this.Results.Add(String.Format("WaitForMessage returned {0}", messageDetected));
+ }
+ }
+
+ if (messageDetected)
+ {
+ if (this.Parameters.AsyncReceive)
+ {
+ if (this.Parameters.TryReceive)
+ {
+ IAsyncResult result = channel.BeginTryReceive(this.Parameters.ReceiveTimeout, null, null);
+ bool ret = channel.EndTryReceive(result, out message);
+
+ lock (this.Results)
+ {
+ this.Results.Add(String.Format("TryReceive returned {0}", ret));
+ }
+ }
+ else
+ {
+ try
+ {
+ IAsyncResult result = channel.BeginReceive(this.Parameters.ReceiveTimeout, null, null);
+ message = channel.EndReceive(result);
+ }
+ catch (TimeoutException)
+ {
+ message = null;
+ }
+ }
+ }
+ else
+ {
+ if (this.Parameters.TryReceive)
+ {
+ bool ret = channel.TryReceive(this.Parameters.ReceiveTimeout, out message);
+
+ lock (this.Results)
+ {
+ this.Results.Add(String.Format("TryReceive returned {0}", ret));
+ }
+ }
+ else
+ {
+ try
+ {
+ message = channel.Receive(this.Parameters.ReceiveTimeout);
+ }
+ catch (TimeoutException)
+ {
+ message = null;
+ }
+ }
+ }
+ }
+ else
+ {
+ if (this.Parameters.TryReceive)
+ {
+ bool ret = false;
+ if (this.Parameters.AsyncReceive)
+ {
+ IAsyncResult result = channel.BeginTryReceive(this.Parameters.ReceiveTimeout, null, null);
+ if (this.Parameters.TryReceiveNullIAsyncResult)
+ {
+ try
+ {
+ channel.EndTryReceive(null, out message);
+ }
+ catch (Exception e)
+ {
+ lock (this.Results)
+ {
+ this.Results.Add(String.Format("TryReceive threw {0}", e.GetType().Name));
+ }
+ }
+ }
+
+ ret = channel.EndTryReceive(result, out message);
+ }
+ else
+ {
+ ret = channel.TryReceive(this.Parameters.ReceiveTimeout, out message);
+ }
+
+ lock (this.Results)
+ {
+ this.Results.Add(String.Format("TryReceive returned {0}", ret));
+ this.Results.Add(String.Format("Message was {0}", (message == null ? "null" : "not null")));
+ }
+ }
+
+ message = null;
+ }
+
+ if (commit && message != null)
+ {
+ lock (this.Results)
+ {
+ this.Results.Add(String.Format("Received message with Action '{0}'", message.Headers.Action));
+ }
+
+ ts.Complete();
+ }
+ else
+ {
+ Transaction.Current.Rollback();
+ }
+ }
+
+ return message;
+ }
+ }
+}
diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelSender.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelSender.cs
new file mode 100644
index 0000000000..78950dc0d5
--- /dev/null
+++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelSender.cs
@@ -0,0 +1,138 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System;
+ using System.ServiceModel;
+ using System.ServiceModel.Channels;
+ using System.Threading;
+ using System.Transactions;
+
+ public class ChannelSender : ChannelEntity
+ {
+ public ChannelSender(ChannelContextParameters contextParameters, Binding channelBinding)
+ : base(contextParameters, channelBinding)
+ {
+ }
+
+ public override void Run(string sendTo)
+ {
+ IChannelFactory<IOutputChannel> factory = this.Binding.BuildChannelFactory<IOutputChannel>();
+ factory.Open();
+
+ if (this.Parameters.CreateChannel)
+ {
+ IOutputChannel channel = factory.CreateChannel(new EndpointAddress(sendTo));
+ this.SendMessages(channel);
+ }
+
+ factory.Close();
+ }
+
+ private void SendMessages(IOutputChannel channel)
+ {
+ channel.Open();
+
+ AutoResetEvent doneSending = new AutoResetEvent(false);
+ int threadsCompleted = 0;
+
+ if (this.Parameters.NumberOfMessages > 0)
+ {
+ this.SendMessage(channel, "FirstMessage", true);
+ }
+
+ if (this.Parameters.NumberOfThreads == 1)
+ {
+ for (int j = 0; j < this.Parameters.NumberOfMessages; ++j)
+ {
+ if (this.Parameters.SenderShouldAbort)
+ {
+ this.SendMessage(channel, "Message " + (j + 1), false);
+ }
+
+ this.SendMessage(channel, "Message " + (j + 1), true);
+ }
+
+ doneSending.Set();
+ }
+ else
+ {
+ for (int i = 0; i < this.Parameters.NumberOfThreads; ++i)
+ {
+ ThreadPool.QueueUserWorkItem(new WaitCallback(delegate(object unused)
+ {
+ for (int j = 0; j < this.Parameters.NumberOfMessages / this.Parameters.NumberOfThreads; ++j)
+ {
+ if (this.Parameters.SenderShouldAbort)
+ {
+ this.SendMessage(channel, "Message", false);
+ }
+
+ this.SendMessage(channel, "Message", true);
+ }
+ if (Interlocked.Increment(ref threadsCompleted) == this.Parameters.NumberOfThreads)
+ {
+ doneSending.Set();
+ }
+ }));
+ }
+ }
+
+ TimeSpan threadTimeout = TimeSpan.FromMinutes(2.0);
+ if (!doneSending.WaitOne(threadTimeout, false))
+ {
+ lock (this.Results)
+ {
+ this.Results.Add(String.Format("Threads did not complete within {0}.", threadTimeout));
+ }
+ }
+
+ doneSending.Close();
+ channel.Close();
+ }
+
+ private void SendMessage(IOutputChannel channel, string action, bool commit)
+ {
+ using (TransactionScope ts = new TransactionScope(TransactionScopeOption.RequiresNew))
+ {
+ Message message = Message.CreateMessage(MessageVersion.Default, action);
+
+ if (this.Parameters.AsyncSend)
+ {
+ IAsyncResult result = channel.BeginSend(message, null, null);
+ channel.EndSend(result);
+ }
+ else
+ {
+ channel.Send(message);
+ }
+
+ if (commit)
+ {
+ ts.Complete();
+ }
+ else
+ {
+ Transaction.Current.Rollback();
+ }
+ }
+ }
+ }
+}
diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/CustomAmqpBindingTest.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/CustomAmqpBindingTest.cs
new file mode 100644
index 0000000000..45a926ce4d
--- /dev/null
+++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/CustomAmqpBindingTest.cs
@@ -0,0 +1,77 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Reflection;
+ using System.ServiceModel;
+ using System.Threading;
+ using NUnit.Framework;
+
+ [TestFixture]
+ public class CustomAmqpBindingTest
+ {
+ private MessageClient client;
+
+ [SetUp]
+ public void Setup()
+ {
+ // Create client
+ this.client = new MessageClient();
+ this.client.NumberOfMessages = 3;
+ this.client.NumberOfIterations = 3;
+
+ // Setup and start service
+ MessageService.EndpointAddress = "amqp:message_queue";
+ MessageService.ContractTypes = new List<Type>();
+ MessageService.ContractTypes.Add(typeof(IInteropService));
+ MessageService.CompletionHandle = new EventWaitHandle(false, EventResetMode.AutoReset);
+ MessageService.IntendedInvocationCount = this.client.NumberOfIterations * this.client.NumberOfMessages * MessageService.ContractTypes.Count;
+ MessageService.StartService(Util.GetCustomBinding());
+ }
+
+ [Test]
+ public void Run()
+ {
+ // Create the WCF AMQP channel and send messages
+ MethodInfo runClientMethod = this.client.GetType().GetMethod("RunInteropClient");
+ EndpointAddress address = new EndpointAddress("amqp:amq.direct?routingkey=routing_key");
+ foreach (Type contractType in MessageService.ContractTypes)
+ {
+ MethodInfo runClientT = runClientMethod.MakeGenericMethod(contractType);
+ runClientT.Invoke(this.client, new object[] { address });
+ }
+
+ // Allow the WCF service to process all the messages before validation
+ MessageService.CompletionHandle.WaitOne(TimeSpan.FromSeconds(10.0), false);
+
+ // Validation
+ int expectedMethodCallCount = this.client.NumberOfIterations * this.client.NumberOfMessages * MessageService.ContractTypes.Count;
+ Assert.AreEqual(expectedMethodCallCount, MessageService.TotalMethodCallCount);
+ }
+
+ [TearDown]
+ public void Cleanup()
+ {
+ MessageService.StopService();
+ }
+ }
+}
diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/FunctionalTests.csproj b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/FunctionalTests.csproj
new file mode 100644
index 0000000000..ab36222d6a
--- /dev/null
+++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/FunctionalTests.csproj
@@ -0,0 +1,121 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Project ToolsVersion="3.5" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProductVersion>9.0.30729</ProductVersion>
+ <SchemaVersion>2.0</SchemaVersion>
+ <ProjectGuid>{E2D8C779-E417-40BA-BEE1-EE034268482F}</ProjectGuid>
+ <OutputType>Library</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>Apache.Qpid.Test.Channel.Functional</RootNamespace>
+ <AssemblyName>Apache.Qpid.Test.Channel.Functional</AssemblyName>
+ <TargetFrameworkVersion>v3.5</TargetFrameworkVersion>
+ <FileAlignment>512</FileAlignment>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>bin\Debug\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>bin\Release\</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="nunit.framework, Version=2.5.2.9222, Culture=neutral, PublicKeyToken=96d09a1eb7f44a77, processorArchitecture=MSIL">
+ <SpecificVersion>False</SpecificVersion>
+ </Reference>
+ <Reference Include="System" />
+ <Reference Include="System.Core">
+ <RequiredTargetFramework>3.5</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Runtime.Serialization">
+ <RequiredTargetFramework>3.0</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.ServiceModel">
+ <RequiredTargetFramework>3.0</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Transactions" />
+ <Reference Include="System.Xml.Linq">
+ <RequiredTargetFramework>3.5</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Data.DataSetExtensions">
+ <RequiredTargetFramework>3.5</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Data" />
+ <Reference Include="System.Xml" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="AsyncTest.cs" />
+ <Compile Include="ChannelAbortCommitTest.cs" />
+ <Compile Include="ChannelContextParameters.cs" />
+ <Compile Include="ChannelEntity.cs" />
+ <Compile Include="ChannelReceiver.cs" />
+ <Compile Include="ChannelSender.cs" />
+ <Compile Include="CustomAmqpBindingTest.cs" />
+ <Compile Include="IGenericObjectService.cs" />
+ <Compile Include="IInteropService.cs" />
+ <Compile Include="IQueuedDatagramService1.cs" />
+ <Compile Include="IQueuedDatagramService2.cs" />
+ <Compile Include="IQueuedDatagramService3.cs" />
+ <Compile Include="IQueuedServiceUsingTransactionScope.cs" />
+ <Compile Include="IQueuedServiceUsingTSRAttribute.cs" />
+ <Compile Include="MessageBodyTest.cs" />
+ <Compile Include="MessagePropertiesTest.cs" />
+ <Compile Include="MultipleEndpointsSameQueueTest.cs" />
+ <Compile Include="MessageClient.cs" />
+ <Compile Include="MessageService.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="BasicTransactionTest.cs" />
+ <Compile Include="Util.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="..\..\..\..\..\..\src\Apache\Qpid\Channel\Channel.csproj">
+ <Project>{8AABAB30-7D1E-4539-B7D1-05450262BAD2}</Project>
+ <Name>Channel</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\..\..\..\..\..\src\Apache\Qpid\Interop\Interop.vcproj">
+ <Project>{C9B6AC75-6332-47A4-B82B-0C20E0AF2D34}</Project>
+ <Name>Interop</Name>
+ </ProjectReference>
+ </ItemGroup>
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+ <PropertyGroup>
+ <PostBuildEvent>
+ </PostBuildEvent>
+ </PropertyGroup>
+</Project> \ No newline at end of file
diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IGenericObjectService.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IGenericObjectService.cs
new file mode 100644
index 0000000000..a1ffac50b3
--- /dev/null
+++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IGenericObjectService.cs
@@ -0,0 +1,30 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System.ServiceModel;
+
+ [ServiceContract(SessionMode = SessionMode.NotAllowed)]
+ public interface IGenericObjectService
+ {
+ [OperationContract(IsOneWay = true)]
+ void SendObject(object message);
+ }
+}
diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IInteropService.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IInteropService.cs
new file mode 100644
index 0000000000..25f7010a89
--- /dev/null
+++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IInteropService.cs
@@ -0,0 +1,31 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System.ServiceModel;
+ using System.ServiceModel.Channels;
+
+ [ServiceContract]
+ public interface IInteropService
+ {
+ [OperationContract(IsOneWay = true, Action = "*")]
+ void Hello(Message message);
+ }
+} \ No newline at end of file
diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService1.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService1.cs
new file mode 100644
index 0000000000..8abbe04874
--- /dev/null
+++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService1.cs
@@ -0,0 +1,33 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System.ServiceModel;
+
+ [ServiceContract(SessionMode = SessionMode.NotAllowed)]
+ public interface IQueuedDatagramService1
+ {
+ [OperationContract(IsOneWay = true)]
+ void Hello(string message);
+
+ [OperationContract(IsOneWay = true)]
+ void Goodbye();
+ }
+} \ No newline at end of file
diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService2.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService2.cs
new file mode 100644
index 0000000000..7d056e9c82
--- /dev/null
+++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService2.cs
@@ -0,0 +1,33 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System.ServiceModel;
+
+ [ServiceContract(SessionMode = SessionMode.NotAllowed)]
+ public interface IQueuedDatagramService2
+ {
+ [OperationContract(IsOneWay = true)]
+ void Hello(string message);
+
+ [OperationContract(IsOneWay = true)]
+ void Goodbye();
+ }
+} \ No newline at end of file
diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService3.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService3.cs
new file mode 100644
index 0000000000..3ff2085557
--- /dev/null
+++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService3.cs
@@ -0,0 +1,33 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System.ServiceModel;
+
+ [ServiceContract(SessionMode = SessionMode.NotAllowed)]
+ public interface IQueuedDatagramService3
+ {
+ [OperationContract(IsOneWay = true)]
+ void Hello(string message);
+
+ [OperationContract(IsOneWay = true)]
+ void Goodbye();
+ }
+} \ No newline at end of file
diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedServiceUsingTSRAttribute.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedServiceUsingTSRAttribute.cs
new file mode 100644
index 0000000000..49c42a25b6
--- /dev/null
+++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedServiceUsingTSRAttribute.cs
@@ -0,0 +1,30 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System.ServiceModel;
+
+ [ServiceContract]
+ public interface IQueuedServiceUsingTSRAttribute
+ {
+ [OperationContract(IsOneWay = true)]
+ void Hello(string message);
+ }
+}
diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedServiceUsingTransactionScope.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedServiceUsingTransactionScope.cs
new file mode 100644
index 0000000000..eabceb5720
--- /dev/null
+++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedServiceUsingTransactionScope.cs
@@ -0,0 +1,30 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System.ServiceModel;
+
+ [ServiceContract]
+ public interface IQueuedServiceUsingTransactionScope
+ {
+ [OperationContract(IsOneWay = true)]
+ void Hello(string message);
+ }
+}
diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageBodyTest.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageBodyTest.cs
new file mode 100644
index 0000000000..a9555d190d
--- /dev/null
+++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageBodyTest.cs
@@ -0,0 +1,134 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System;
+ using System.Runtime.Serialization;
+ using System.ServiceModel;
+ using System.ServiceModel.Channels;
+ using NUnit.Framework;
+
+ [TestFixture]
+ public class MessageBodyTest
+ {
+ private const string Queue = "amqp:amq.direct?routingkey=routing_key";
+
+ [Test]
+ public void DateVariation()
+ {
+ DateTime rightNow = DateTime.UtcNow;
+ this.SendMessage(rightNow);
+ this.ReceiveMessage<DateTime>(rightNow);
+ }
+
+ [Test]
+ public void EmptyStringVariation()
+ {
+ const string TestString = "";
+ this.SendMessage(TestString);
+ this.ReceiveMessage<string>(TestString);
+ }
+
+ [Test]
+ public void IntPrimitiveVariation()
+ {
+ const int TheAnswer = 42;
+ this.SendMessage(TheAnswer);
+ this.ReceiveMessage<int>(TheAnswer);
+ }
+
+ [Test]
+ public void MultipleIntVariation()
+ {
+ const int NumberOfMessages = 20;
+ int[] listOfNumbers = new int[NumberOfMessages];
+
+ for (int i = 0; i < NumberOfMessages; i++)
+ {
+ this.SendMessage(i);
+ listOfNumbers[i] = i;
+ }
+
+ Assert.True(listOfNumbers[NumberOfMessages - 1] != 0, "Not all messages were sent.");
+
+ for (int j = 0; j < NumberOfMessages; j++)
+ {
+ int receivedNumber = this.ReceiveMessage<int>();
+ Assert.True(listOfNumbers[j].Equals(receivedNumber), "Received {0} - this number is unknown or has been received more than once.", receivedNumber);
+ }
+ }
+
+ [Test]
+ public void StringVariation()
+ {
+ const string TestString = "The darkest of dim, dreary days dost draw deathly deeds. どーも";
+ this.SendMessage(TestString);
+ this.ReceiveMessage<string>(TestString);
+ }
+
+ private void SendMessage(object objectToSend)
+ {
+ IChannelFactory<IOutputChannel> channelFactory =
+ Util.GetBinding().BuildChannelFactory<IOutputChannel>();
+ channelFactory.Open();
+ IOutputChannel proxy = channelFactory.CreateChannel(new EndpointAddress(Queue));
+ proxy.Open();
+ Message toSend = Message.CreateMessage(MessageVersion.Default, string.Empty, objectToSend);
+ proxy.Send(toSend);
+ toSend.Close();
+ channelFactory.Close();
+ }
+
+ private TObjectType ReceiveMessage<TObjectType>()
+ {
+ Uri endpoint = new Uri("amqp:message_queue");
+ IChannelListener<IInputChannel> listener = Util.GetBinding().BuildChannelListener<IInputChannel>(endpoint, new BindingParameterCollection());
+ listener.Open();
+ IInputChannel service = listener.AcceptChannel(TimeSpan.FromSeconds(10));
+ service.Open();
+ Message receivedMessage = service.Receive(TimeSpan.FromSeconds(10));
+ Assert.NotNull(receivedMessage, "Message was not received");
+ try
+ {
+ TObjectType receivedObject = receivedMessage.GetBody<TObjectType>();
+ return receivedObject;
+ }
+ catch (SerializationException)
+ {
+ Assert.Fail("Deserialized object not of correct type");
+ }
+ finally
+ {
+ receivedMessage.Close();
+ service.Close();
+ listener.Close();
+ }
+
+ return default(TObjectType);
+ }
+
+ private TObjectType ReceiveMessage<TObjectType>(TObjectType objectToMatch)
+ {
+ TObjectType receivedObject = this.ReceiveMessage<TObjectType>();
+ Assert.True(objectToMatch.Equals(receivedObject), "Original and deserialized objects do not match");
+ return receivedObject;
+ }
+ }
+}
diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageClient.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageClient.cs
new file mode 100644
index 0000000000..b623a0196b
--- /dev/null
+++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageClient.cs
@@ -0,0 +1,144 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System;
+ using System.Reflection;
+ using System.ServiceModel;
+ using System.ServiceModel.Channels;
+ using System.Transactions;
+
+ public class MessageClient
+ {
+ public int NumberOfMessages
+ {
+ get;
+ set;
+ }
+
+ public int NumberOfIterations
+ {
+ get;
+ set;
+ }
+
+ public bool TransactedSend
+ {
+ get;
+ set;
+ }
+
+ public bool CommitTransaction
+ {
+ get;
+ set;
+ }
+
+ public void RunClient<TServiceContract>(EndpointAddress address)
+ {
+ string[] messages = new string[this.NumberOfMessages];
+ for (int i = 0; i < this.NumberOfMessages; ++i)
+ {
+ messages[i] = "Message " + i;
+ }
+
+ RunTestClient<TServiceContract>(address, messages);
+ }
+
+ public void RunTestClient<TServiceContract>(EndpointAddress address, object[] messages)
+ {
+ Binding amqpBinding = Util.GetBinding();
+ Type proxyType = typeof(TServiceContract);
+ MethodInfo helloMethod = proxyType.GetMethod("Hello");
+ MethodInfo goodbyeMethod = proxyType.GetMethod("Goodbye");
+
+ for (int i = 0; i < this.NumberOfIterations; ++i)
+ {
+ this.CreateChannelAndSendMessages<TServiceContract>(address, amqpBinding, helloMethod, goodbyeMethod, messages);
+ }
+ }
+
+ public void RunInteropClient<TServiceContract>(EndpointAddress address)
+ {
+ Binding amqpBinding = Util.GetBinding();
+ Type proxyType = typeof(TServiceContract);
+ MethodInfo helloMethod = proxyType.GetMethod("Hello");
+
+ Message[] messages = new Message[this.NumberOfMessages];
+
+ for (int i = 0; i < this.NumberOfIterations; ++i)
+ {
+ this.CreateInteropChannelAndSendMessages<TServiceContract>(address, amqpBinding, helloMethod, this.NumberOfMessages);
+ }
+ }
+
+ private void CreateChannelAndSendMessages<TServiceContract>(EndpointAddress address, Binding amqpBinding, MethodInfo helloMethod, MethodInfo goodbyeMethod, object[] messages)
+ {
+ ChannelFactory<TServiceContract> channelFactory = new ChannelFactory<TServiceContract>(amqpBinding, address);
+ TServiceContract proxy = channelFactory.CreateChannel();
+
+ if (this.TransactedSend)
+ {
+ using (TransactionScope tx = new TransactionScope(TransactionScopeOption.Required, TimeSpan.FromMinutes(20)))
+ {
+ foreach (object message in messages)
+ {
+ helloMethod.Invoke(proxy, new object[] { message });
+ }
+
+ if (goodbyeMethod != null)
+ {
+ goodbyeMethod.Invoke(proxy, new object[0]);
+ }
+
+ if (this.CommitTransaction)
+ {
+ tx.Complete();
+ }
+ }
+ }
+ else
+ {
+ foreach (object message in messages)
+ {
+ helloMethod.Invoke(proxy, new object[] { message });
+ }
+
+ if (goodbyeMethod != null)
+ {
+ goodbyeMethod.Invoke(proxy, new object[0]);
+ }
+ }
+ }
+
+ private void CreateInteropChannelAndSendMessages<TServiceContract>(EndpointAddress address, Binding amqpBinding, MethodInfo helloMethod, int messageCount)
+ {
+ ChannelFactory<TServiceContract> channelFactory = new ChannelFactory<TServiceContract>(amqpBinding, address);
+ TServiceContract proxy = channelFactory.CreateChannel();
+
+ for (int i = 0; i < messageCount; i++)
+ {
+ helloMethod.Invoke(proxy, new object[] { Message.CreateMessage(MessageVersion.Soap12WSAddressing10, "*") });
+ }
+
+ channelFactory.Close();
+ }
+ }
+}
diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageProperties.txt b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageProperties.txt
new file mode 100644
index 0000000000..bd6459ccb9
--- /dev/null
+++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageProperties.txt
@@ -0,0 +1,22 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+ContentType=Text
+Durable=true
+RoutingKey=routing_key
+TimeToLive=00:00:10 \ No newline at end of file
diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MessagePropertiesTest.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MessagePropertiesTest.cs
new file mode 100644
index 0000000000..8e192e90f1
--- /dev/null
+++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MessagePropertiesTest.cs
@@ -0,0 +1,131 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Reflection;
+ using System.Runtime.Serialization;
+ using System.ServiceModel;
+ using System.ServiceModel.Channels;
+ using Apache.Qpid.AmqpTypes;
+ using NUnit.Framework;
+
+ [TestFixture]
+ public class MessagePropertiesTest
+ {
+ private const string RoutingKey = "routing_key";
+ private const string SendToUri = "amqp:amq.direct?routingkey=" + RoutingKey;
+
+ [Test]
+ public void DefaultAmqpProperties()
+ {
+ const string TestString = "Test Message";
+ AmqpProperties messageProperties = new AmqpProperties();
+
+ this.SendMessage(TestString, messageProperties);
+ this.ReceiveMessage<string>(TestString, messageProperties);
+ }
+
+ [Test]
+ public void NonDefaultAmqpProperties()
+ {
+ const string TestString = "Test Message";
+ AmqpProperties messageProperties = this.CreateMessageProperties();
+
+ this.SendMessage(TestString, messageProperties);
+ this.ReceiveMessage<string>(TestString, messageProperties);
+ }
+
+ private AmqpProperties CreateMessageProperties()
+ {
+ Dictionary<string, string> messageProperties = Util.GetProperties("..\\..\\MessageProperties.txt");
+
+ AmqpProperties amqpProperties = new AmqpProperties();
+ amqpProperties.ContentType = (string)messageProperties["ContentType"];
+ amqpProperties.Durable = Convert.ToBoolean((string)messageProperties["Durable"]);
+ amqpProperties.RoutingKey = (string)messageProperties["RoutingKey"];
+ amqpProperties.TimeToLive = TimeSpan.Parse((string)messageProperties["TimeToLive"]);
+
+ return amqpProperties;
+ }
+
+ private void SendMessage(object objectToSend, AmqpProperties propertiesToSend)
+ {
+ ChannelFactory<IOutputChannel> channelFactory =
+ new ChannelFactory<IOutputChannel>(Util.GetBinding(), SendToUri);
+ IOutputChannel proxy = channelFactory.CreateChannel();
+ proxy.Open();
+
+ Message toSend = Message.CreateMessage(MessageVersion.Default, string.Empty, objectToSend);
+ toSend.Properties["AmqpProperties"] = propertiesToSend;
+ proxy.Send(toSend);
+
+ toSend.Close();
+ proxy.Close();
+ channelFactory.Close();
+ }
+
+ private void ReceiveMessage<TObjectType>(TObjectType objectToMatch, AmqpProperties expectedProperties)
+ {
+ Uri receiveFromUri = new Uri("amqp:message_queue");
+ IChannelListener<IInputChannel> listener = Util.GetBinding().BuildChannelListener<IInputChannel>(receiveFromUri, new BindingParameterCollection());
+ listener.Open();
+ IInputChannel service = listener.AcceptChannel(TimeSpan.FromSeconds(10));
+ service.Open();
+ Message receivedMessage = service.Receive(TimeSpan.FromSeconds(10));
+ try
+ {
+ TObjectType receivedObject = receivedMessage.GetBody<TObjectType>();
+ Assert.True(receivedObject.Equals(objectToMatch), "Original and deserialized objects do not match");
+
+ AmqpProperties receivedProperties = (AmqpProperties)receivedMessage.Properties["AmqpProperties"];
+ PropertyInfo[] propInfo = typeof(AmqpProperties).GetProperties();
+
+ for (int i = 0; i < propInfo.Length; i++)
+ {
+ string propertyName = propInfo[i].Name;
+ if (propertyName.Equals("RoutingKey", StringComparison.InvariantCultureIgnoreCase))
+ {
+ Assert.AreEqual(RoutingKey, Convert.ToString(propInfo[i].GetValue(receivedProperties, null)));
+ }
+ else
+ {
+ Assert.AreEqual(Convert.ToString(propInfo[i].GetValue(expectedProperties, null)), Convert.ToString(propInfo[i].GetValue(receivedProperties, null)));
+ }
+ }
+ }
+ catch (NullReferenceException)
+ {
+ Assert.Fail("Message not received");
+ }
+ catch (SerializationException)
+ {
+ Assert.Fail("Deserialized object not of correct type");
+ }
+ finally
+ {
+ receivedMessage.Close();
+ service.Close();
+ listener.Close();
+ }
+ }
+ }
+}
diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageService.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageService.cs
new file mode 100644
index 0000000000..581464d25e
--- /dev/null
+++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageService.cs
@@ -0,0 +1,198 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System;
+ using System.Collections.Generic;
+ using System.ServiceModel;
+ using System.ServiceModel.Channels;
+ using System.Threading;
+ using System.Transactions;
+
+ public class MessageService : IQueuedDatagramService1, IQueuedDatagramService2, IQueuedDatagramService3, IInteropService, IQueuedServiceUsingTransactionScope, IQueuedServiceUsingTSRAttribute
+ {
+ private static Dictionary<string, int> methodCallCount = new Dictionary<string, int>();
+ private static ServiceHost serviceHost;
+
+ public static EventWaitHandle CompletionHandle
+ {
+ get;
+ set;
+ }
+
+ public static int IntendedInvocationCount
+ {
+ get;
+ set;
+ }
+
+ public static int TotalMethodCallCount
+ {
+ get;
+ set;
+ }
+
+ // The test must set the following paramters
+ public static List<Type> ContractTypes
+ {
+ get;
+ set;
+ }
+
+ public static string EndpointAddress
+ {
+ get;
+ set;
+ }
+
+ public static void DisplayCounts()
+ {
+ Console.WriteLine("Method calls:");
+ foreach (string key in methodCallCount.Keys)
+ {
+ Console.WriteLine(" {0}: {1}", key, methodCallCount[key]);
+ }
+
+ Console.WriteLine("Total: {0}", TotalMethodCallCount);
+ }
+
+ public static void StartService(Binding amqpBinding)
+ {
+ MessageService.methodCallCount.Clear();
+ MessageService.TotalMethodCallCount = 0;
+
+ serviceHost = new ServiceHost(typeof(MessageService));
+
+ foreach (Type contractType in ContractTypes)
+ {
+ serviceHost.AddServiceEndpoint(contractType, amqpBinding, EndpointAddress);
+ }
+
+ serviceHost.Open();
+ }
+
+ public static bool IsServiceRunning()
+ {
+ return (serviceHost != null && serviceHost.State == CommunicationState.Opened) ? true : false;
+ }
+
+ public static void StopService()
+ {
+ if (serviceHost.State != CommunicationState.Faulted)
+ {
+ try
+ {
+ serviceHost.Close();
+ }
+ catch (Exception e)
+ {
+ Console.WriteLine("An exception was thrown while trying to close the service host.\n" + e);
+ }
+ }
+ else
+ {
+ Console.WriteLine("Service Faulted.");
+ }
+ }
+
+ public void UpdateCounts(string method)
+ {
+ lock (methodCallCount)
+ {
+ if (!methodCallCount.ContainsKey(method))
+ {
+ methodCallCount[method] = 0;
+ }
+
+ ++methodCallCount[method];
+ ++TotalMethodCallCount;
+
+ if (TotalMethodCallCount >= IntendedInvocationCount && CompletionHandle != null)
+ {
+ CompletionHandle.Set();
+ }
+ }
+ }
+
+ [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
+ void IQueuedServiceUsingTransactionScope.Hello(string message)
+ {
+ this.UpdateCounts("IQueuedServiceUsingTransactionScope.Hello");
+
+ if (message.Trim().ToLower().StartsWith("abort"))
+ {
+ throw new Exception();
+ }
+ }
+
+ [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
+ void IQueuedServiceUsingTSRAttribute.Hello(string message)
+ {
+ this.UpdateCounts("IQueuedServiceUsingTSRAttribute.Hello");
+
+ if (message.Trim().ToLower().StartsWith("abort"))
+ {
+ Transaction.Current.Rollback();
+ }
+ }
+
+ [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
+ void IQueuedDatagramService1.Hello(string message)
+ {
+ this.UpdateCounts("IQueuedDatagramService1.Hello");
+ }
+
+ [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
+ void IQueuedDatagramService1.Goodbye()
+ {
+ this.UpdateCounts("IQueuedDatagramService1.Goodbye");
+ }
+
+ [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
+ void IQueuedDatagramService2.Hello(string message)
+ {
+ this.UpdateCounts("IQueuedDatagramService2.Hello");
+ }
+
+ [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
+ void IQueuedDatagramService2.Goodbye()
+ {
+ this.UpdateCounts("IQueuedDatagramService2.Goodbye");
+ }
+
+ [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
+ void IQueuedDatagramService3.Hello(string message)
+ {
+ this.UpdateCounts("IQueuedDatagramService3.Hello");
+ }
+
+ [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
+ void IQueuedDatagramService3.Goodbye()
+ {
+ this.UpdateCounts("IQueuedDatagramService3.Goodbye");
+ }
+
+ [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
+ void IInteropService.Hello(Message message)
+ {
+ this.UpdateCounts("IInteropService.Hello");
+ }
+ }
+}
diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MultipleEndpointsSameQueueTest.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MultipleEndpointsSameQueueTest.cs
new file mode 100644
index 0000000000..d09832757a
--- /dev/null
+++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MultipleEndpointsSameQueueTest.cs
@@ -0,0 +1,83 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Reflection;
+ using System.ServiceModel;
+ using System.Threading;
+ using NUnit.Framework;
+
+ [TestFixture]
+ public class MultipleEndpointsSameQueueTest
+ {
+ private MessageClient client;
+
+ [SetUp]
+ public void Setup()
+ {
+ // Create client
+ this.client = new MessageClient();
+ this.client.NumberOfMessages = 3;
+ this.client.NumberOfIterations = 5;
+
+ // Setup and start service
+ MessageService.EndpointAddress = "amqp:message_queue";
+
+ MessageService.ContractTypes = new List<Type>();
+ MessageService.ContractTypes.Add(typeof(IQueuedDatagramService1));
+ MessageService.ContractTypes.Add(typeof(IQueuedDatagramService2));
+ MessageService.ContractTypes.Add(typeof(IQueuedDatagramService3));
+ MessageService.CompletionHandle = new EventWaitHandle(false, EventResetMode.AutoReset);
+ MessageService.IntendedInvocationCount = this.client.NumberOfIterations * (this.client.NumberOfMessages + 1) * MessageService.ContractTypes.Count;
+
+ MessageService.StartService(Util.GetBinding());
+ }
+
+ [Test]
+ public void Run()
+ {
+ // Create wcf amqpchannel and send messages
+ MethodInfo runClientMethod = this.client.GetType().GetMethod("RunClient");
+ EndpointAddress address = new EndpointAddress("amqp:amq.direct?routingkey=routing_key");
+
+ foreach (Type contractType in MessageService.ContractTypes)
+ {
+ MethodInfo runClientT = runClientMethod.MakeGenericMethod(contractType);
+ runClientT.Invoke(this.client, new object[] { address });
+ }
+
+ // Allow the wcf service to process all the messages before validation
+ MessageService.CompletionHandle.WaitOne(TimeSpan.FromSeconds(10.0), false);
+
+ // Validation
+ int expectedMethodCallCount = this.client.NumberOfIterations * (this.client.NumberOfMessages + 1) * MessageService.ContractTypes.Count;
+
+ Assert.AreEqual(expectedMethodCallCount, MessageService.TotalMethodCallCount);
+ }
+
+ [TearDown]
+ public void Cleanup()
+ {
+ MessageService.StopService();
+ }
+ }
+}
diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/Properties/AssemblyInfo.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/Properties/AssemblyInfo.cs
new file mode 100644
index 0000000000..b47a25494f
--- /dev/null
+++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/Properties/AssemblyInfo.cs
@@ -0,0 +1,55 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyTitle("Apache.Qpid.Test.Channel.Functional")]
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("")]
+[assembly: AssemblyProduct("")]
+[assembly: AssemblyCopyright("")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible
+// to COM components. If you need to access a type in this assembly from
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("552dca74-b5a3-4ad3-a718-4a1dd03db039")]
+
+// Version information for an assembly consists of the following four values:
+//
+// Major Version
+// Minor Version
+// Build Number
+// Revision
+//
+// You can specify all the values or you can default the Build and Revision Numbers
+// by using the '*' as shown below:
+// [assembly: AssemblyVersion("1.0.*")]
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]
diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/RunTests.bat b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/RunTests.bat
new file mode 100755
index 0000000000..a5eed8839b
--- /dev/null
+++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/RunTests.bat
@@ -0,0 +1,34 @@
+@echo OFF
+
+REM Licensed to the Apache Software Foundation (ASF) under one
+REM or more contributor license agreements. See the NOTICE file
+REM distributed with this work for additional information
+REM regarding copyright ownership. The ASF licenses this file
+REM to you under the Apache License, Version 2.0 (the
+REM "License"); you may not use this file except in compliance
+REM with the License. You may obtain a copy of the License at
+REM
+REM http://www.apache.org/licenses/LICENSE-2.0
+REM
+REM Unless required by applicable law or agreed to in writing,
+REM software distributed under the License is distributed on an
+REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+REM KIND, either express or implied. See the License for the
+REM specific language governing permissions and limitations
+REM under the License.
+
+
+set nunit_exe=%programfiles%\NUnit 2.5.1\bin\net-2.0\nunit-console.exe
+set qpid_dll_location=%QPID_BUILD_ROOT%\src\Debug
+set configuration_name=bin\Debug
+set qcreate_location=..\..\..\..\..\..\tools\QCreate\Debug
+
+copy %qpid_dll_location%\qpidclientd.dll %configuration_name%
+copy %qpid_dll_location%\qpidcommond.dll %configuration_name%
+
+copy %qpid_dll_location%\qpidclientd.dll %qcreate_location%
+copy %qpid_dll_location%\qpidcommond.dll %qcreate_location%
+
+%qcreate_location%\QCreate.exe amq.direct routing_key message_queue
+
+"%nunit_exe%" %configuration_name%\Apache.Qpid.Test.Channel.Functional.dll
diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/Util.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/Util.cs
new file mode 100644
index 0000000000..f08a6fbbfc
--- /dev/null
+++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/Util.cs
@@ -0,0 +1,157 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System;
+ using System.Collections.Generic;
+ using System.IO;
+ using System.ServiceModel;
+ using System.ServiceModel.Channels;
+ using Apache.Qpid.Channel;
+
+ internal class Util
+ {
+ public static Dictionary<string, string> GetProperties(string path)
+ {
+ string fileData = string.Empty;
+ using (StreamReader sr = new StreamReader(path))
+ {
+ fileData = sr.ReadToEnd().Replace("\r", string.Empty);
+ }
+
+ Dictionary<string, string> properties = new Dictionary<string, string>();
+ string[] kvp;
+ string[] records = fileData.Split("\n".ToCharArray());
+ foreach (string record in records)
+ {
+ if (record[0] == '/' || record[0] == '*')
+ {
+ continue;
+ }
+
+ kvp = record.Split("=".ToCharArray());
+ properties.Add(kvp[0], kvp[1]);
+ }
+
+ return properties;
+ }
+
+ public static Binding GetBinding()
+ {
+ return new AmqpBinding();
+ }
+
+ public static Binding GetCustomBinding()
+ {
+ AmqpTransportBindingElement transportElement = new AmqpTransportBindingElement();
+ RawMessageEncodingBindingElement encodingElement = new RawMessageEncodingBindingElement();
+ transportElement.BrokerHost = "127.0.0.1";
+ transportElement.TransferMode = TransferMode.Streamed;
+
+ CustomBinding brokerBinding = new CustomBinding();
+ brokerBinding.Elements.Add(encodingElement);
+ brokerBinding.Elements.Add(transportElement);
+
+ return brokerBinding;
+ }
+
+ public static int GetMessageCountFromQueue(string listenUri)
+ {
+ Message receivedMessage = null;
+ int messageCount = 0;
+
+ IChannelListener<IInputChannel> listener = Util.GetBinding().BuildChannelListener<IInputChannel>(new Uri(listenUri), new BindingParameterCollection());
+ listener.Open();
+ IInputChannel proxy = listener.AcceptChannel(TimeSpan.FromSeconds(10));
+ proxy.Open();
+
+ while (true)
+ {
+ try
+ {
+ receivedMessage = proxy.Receive(TimeSpan.FromSeconds(3));
+ }
+ catch (Exception e)
+ {
+ if (e.GetType() == typeof(TimeoutException))
+ {
+ break;
+ }
+ else
+ {
+ throw;
+ }
+ }
+
+ messageCount++;
+ }
+
+ listener.Close();
+ return messageCount;
+ }
+
+ public static void PurgeQueue(string listenUri)
+ {
+ GetMessageCountFromQueue(listenUri);
+ }
+
+ public static bool CompareResults(List<string> expectedResults, List<string> actualResults)
+ {
+ IEnumerator<string> actualResultEnumerator = actualResults.GetEnumerator();
+ IEnumerator<string> expectedResultEnumerator = expectedResults.GetEnumerator();
+
+ bool expectedResultEnumeratorPosition = expectedResultEnumerator.MoveNext();
+ bool actualResultEnumeratorPosition = actualResultEnumerator.MoveNext();
+
+ while (true == actualResultEnumeratorPosition &&
+ true == expectedResultEnumeratorPosition)
+ {
+ string expectedResult = expectedResultEnumerator.Current;
+ string actualResult = actualResultEnumerator.Current;
+
+ if (expectedResult.Equals(actualResult) == false)
+ {
+ Console.WriteLine("OrderedResultsComparator: Expected result '{0}', but got '{1}' instead.", expectedResult, actualResult);
+ return false;
+ }
+
+ expectedResultEnumeratorPosition = expectedResultEnumerator.MoveNext();
+ actualResultEnumeratorPosition = actualResultEnumerator.MoveNext();
+ }
+
+ // if either of them has still more data left, its an error
+ if (true == expectedResultEnumeratorPosition)
+ {
+ string expectedResult = expectedResultEnumerator.Current;
+ Console.WriteLine("OrderedResultsComparator: Got fewer results than expected, first missing result: '{0}'", expectedResult);
+ return false;
+ }
+
+ if (true == actualResultEnumeratorPosition)
+ {
+ string actualResult = actualResultEnumerator.Current;
+ Console.WriteLine("OrderedResultsComparator: Got more results than expected, first extra result: '{0}'", actualResult);
+ return false;
+ }
+
+ return true;
+ }
+ }
+}
diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/RawBodyUtility.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/RawBodyUtility.cs
new file mode 100644
index 0000000000..55a01c790c
--- /dev/null
+++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/RawBodyUtility.cs
@@ -0,0 +1,161 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.WcfPerftest
+{
+ using System;
+ using System.Collections;
+ using System.IO;
+ using System.ServiceModel;
+ using System.ServiceModel.Channels;
+ using System.ServiceModel.Description;
+ using System.Threading;
+ using System.Text;
+ using System.Xml;
+ using Apache.Qpid.Channel;
+
+
+ /// <summary>
+ /// A sample interface for populating and extracting message body content.
+ /// Just enough methods to handle basic Interop text and raw byte messages.
+ /// </summary>
+
+
+ public interface IRawBodyUtility
+ {
+ Message CreateMessage(byte[] body, int offset, int len);
+ Message CreateMessage(byte[] body);
+ byte[] GetBytes(Message m, byte[] recyclableBuffer);
+
+ Message CreateMessage(string body);
+ string GetText(Message m);
+ }
+
+ // an implementation of IRawBodyUtility that expects a RawMessageEncoder based channel
+
+ public class RawEncoderUtility : IRawBodyUtility
+ {
+ public Message CreateMessage(byte[] body, int offset, int count)
+ {
+ return Message.CreateMessage(MessageVersion.None, "", new RawEncoderBodyWriter(body, offset, count));
+ }
+
+ public Message CreateMessage(byte[] body)
+ {
+ return CreateMessage(body, 0, body.Length);
+ }
+
+ public byte[] GetBytes(Message message, byte[] recyclableBuffer)
+ {
+ XmlDictionaryReader reader = message.GetReaderAtBodyContents();
+ int length;
+
+ while (!reader.HasValue)
+ {
+ reader.Read();
+ if (reader.EOF)
+ {
+ throw new InvalidDataException("empty XmlDictionaryReader");
+ }
+ }
+
+ if (reader.TryGetBase64ContentLength(out length))
+ {
+ byte[] bytes = null;
+ if (recyclableBuffer != null)
+ {
+ if (recyclableBuffer.Length == length)
+ {
+ // reuse
+ bytes = recyclableBuffer;
+ }
+ }
+
+ if (bytes == null)
+ {
+ bytes = new byte[length];
+ }
+
+ // this is the single copy mechanism from native to managed space with no intervening
+ // buffers. One could also write a method GetBytes(msg, myBuf, offset)...
+ reader.ReadContentAsBase64(bytes, 0, length);
+ reader.Close();
+ return bytes;
+ }
+ else
+ {
+ // uses whatever default buffering mechanism is used by the base XmlDictionaryReader class
+ return reader.ReadContentAsBase64();
+ }
+ }
+
+ public Message CreateMessage(string body)
+ {
+ return Message.CreateMessage(MessageVersion.None, "", new RawEncoderBodyWriter(body));
+ }
+
+ public string GetText(Message message)
+ {
+ byte[] rawBuffer = GetBytes(message, null);
+ return Encoding.UTF8.GetString(rawBuffer, 0, rawBuffer.Length);
+ }
+
+ internal class RawEncoderBodyWriter : BodyWriter
+ {
+ // works only with the Raw Encoder; the "body" is either a single string or byte[] segment
+ String bodyAsString;
+ byte[] bodyAsBytes;
+ int offset;
+ int count;
+
+ public RawEncoderBodyWriter(string body)
+ : base(false) // isBuffered
+ {
+ this.bodyAsString = body;
+ }
+
+ public RawEncoderBodyWriter(byte[] body, int offset, int count)
+ : base(false) // isBuffered
+ {
+ this.bodyAsBytes = body;
+ this.offset = offset;
+ this.count = count;
+ }
+
+ protected override void OnWriteBodyContents(System.Xml.XmlDictionaryWriter writer)
+ {
+ // TODO: RawMessageEncoder.StreamElementName should be public.
+ writer.WriteStartElement("Binary"); // the expected Raw encoder "<Binary>" virtual xml tag
+
+ if (bodyAsString != null)
+ {
+ byte[] buf = Encoding.UTF8.GetBytes(bodyAsString);
+ writer.WriteBase64(buf, 0, buf.Length);
+ }
+ else
+ {
+ writer.WriteBase64(this.bodyAsBytes, this.offset, this.count);
+ }
+
+ writer.WriteEndElement();
+ }
+ }
+ }
+
+}
diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/WcfPerftest.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/WcfPerftest.cs
new file mode 100644
index 0000000000..c97d3da27c
--- /dev/null
+++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/WcfPerftest.cs
@@ -0,0 +1,783 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.WcfPerftest
+{
+ using System;
+ using System.Collections;
+ using System.Collections.Generic;
+ using System.ComponentModel;
+ using System.Configuration;
+ using System.Diagnostics;
+ using System.IO;
+ using System.ServiceModel;
+ using System.ServiceModel.Channels;
+ using System.ServiceModel.Description;
+ using System.Threading;
+ using System.Transactions;
+ using System.Text;
+ using System.Xml;
+ using Apache.Qpid.AmqpTypes;
+ using Apache.Qpid.Channel;
+
+ // this program implements a subset of the functionality in qpid\cpp\src\tests\perftest.cpp
+
+ // for a given broker, create reader and writer channels to queues/exchanges
+ // lazilly creates binding and channel factories
+
+ public class QueueChannelFactory
+ {
+ private static AmqpBinding brokerBinding;
+ private static BindingParameterCollection bindingParameters;
+ private static IChannelFactory<IInputChannel> readerFactory;
+ private static IChannelFactory<IOutputChannel> writerFactory;
+ private static string brokerAddr = "127.0.0.1";
+ private static int brokerPort = 5672;
+ private static string userName;
+ private static string password;
+ private static bool ssl = false;
+
+ public static void SetBroker(string addr, int port)
+ {
+ brokerAddr = addr;
+ brokerPort = port;
+ }
+
+ public static void SetSecurity(bool sslMode, string name, string pass)
+ {
+ ssl = sslMode;
+ if (name != null)
+ {
+ userName = name;
+ password = pass;
+ }
+ }
+
+ private static void InitializeBinding()
+ {
+ AmqpBinaryBinding binding = new AmqpBinaryBinding();
+ bindingParameters = new BindingParameterCollection();
+
+ binding.BrokerHost = brokerAddr;
+ binding.BrokerPort = brokerPort;
+ binding.TransferMode = TransferMode.Streamed;
+ binding.PrefetchLimit = 5000;
+ binding.Shared = true;
+
+ if (ssl || (userName != null))
+ {
+ binding.Security.Mode = AmqpSecurityMode.Transport;
+ binding.Security.Transport.UseSSL = ssl;
+
+ if (userName != null)
+ {
+ binding.Security.Transport.CredentialType = AmqpCredentialType.Plain;
+
+ ClientCredentials credentials = new ClientCredentials();
+ credentials.UserName.UserName = userName;
+ credentials.UserName.Password = password;
+ bindingParameters.Add(credentials);
+ }
+ }
+
+ brokerBinding = binding;
+ }
+
+ public static IInputChannel CreateReaderChannel(string queueName)
+ {
+ lock (typeof(QueueChannelFactory))
+ {
+ if (brokerBinding == null)
+ {
+ InitializeBinding();
+ }
+
+ if (readerFactory == null)
+ {
+ readerFactory = brokerBinding.BuildChannelFactory<IInputChannel>(bindingParameters);
+ readerFactory.Open();
+ }
+
+ IInputChannel channel = readerFactory.CreateChannel(new EndpointAddress(
+ new Uri("amqp:" + queueName)));
+ channel.Open();
+
+ return channel;
+ }
+ }
+
+ public static IOutputChannel CreateWriterChannel(string exchangeName, string routingKey)
+ {
+ lock (typeof(QueueChannelFactory))
+ {
+ if (brokerBinding == null)
+ {
+ InitializeBinding();
+ }
+
+ if (writerFactory == null)
+ {
+ writerFactory = brokerBinding.BuildChannelFactory<IOutputChannel>(bindingParameters);
+ writerFactory.Open();
+ }
+
+ IOutputChannel channel = writerFactory.CreateChannel(new EndpointAddress(
+ "amqp:" + exchangeName +
+ "?routingkey=" + routingKey));
+ channel.Open();
+
+ return channel;
+ }
+ }
+ }
+
+ public enum ClientType
+ {
+ Publisher,
+ Subscriber,
+ InteropDemo
+ }
+
+ public enum SaslMechanism
+ {
+ None,
+ Plain
+ }
+
+ public class Options
+ {
+ public string broker;
+ public int port;
+ public UInt64 messageCount;
+ public int messageSize;
+ public ClientType type;
+ public string baseName;
+ public int subTxSize;
+ public int pubTxSize;
+ public bool durable;
+ public bool ssl;
+ public string username;
+ public string password;
+ public SaslMechanism saslMechanism;
+
+ public Options()
+ {
+ this.broker = "127.0.0.1";
+ this.port = 5672;
+ this.messageCount = 500000;
+ this.messageSize = 1024;
+ this.type = ClientType.InteropDemo; // default: once as pub and once as sub
+ this.baseName = "qpid-perftest";
+ this.pubTxSize = 0;
+ this.subTxSize = 0;
+ this.durable = false;
+ this.ssl = false;
+ this.username = null;
+ this.password = null;
+ this.saslMechanism = SaslMechanism.None;
+ }
+
+ public void Parse(string[] args)
+ {
+ int argCount = args.Length;
+ int current = 0;
+ bool typeSelected = false;
+
+ while (current < argCount)
+ {
+ string arg = args[current];
+ if (arg == "--publish")
+ {
+ if (typeSelected)
+ throw new ArgumentException("too many roles");
+
+ this.type = ClientType.Publisher;
+ typeSelected = true;
+ }
+ else if (arg == "--subscribe")
+ {
+ if (typeSelected)
+ throw new ArgumentException("too many roles");
+
+ this.type = ClientType.Subscriber;
+ typeSelected = true;
+ }
+ else if (arg == "--size")
+ {
+ arg = args[++current];
+ int i = int.Parse(arg);
+ if (i > 0)
+ {
+ this.messageSize = i;
+ }
+ }
+ else if (arg == "--count")
+ {
+ arg = args[++current];
+ UInt64 i = UInt64.Parse(arg);
+ if (i > 0)
+ {
+ this.messageCount = i;
+ }
+ }
+ else if (arg == "--broker")
+ {
+ this.broker = args[++current];
+ }
+ else if (arg == "--port")
+ {
+ arg = args[++current];
+ int i = int.Parse(arg);
+ if (i > 0)
+ {
+ this.port = i;
+ }
+ }
+ else if (arg == "--base-name")
+ {
+ this.baseName = args[++current];
+ }
+
+ else if (arg == "--tx")
+ {
+ arg = args[++current];
+ int i = int.Parse(arg);
+ if (i > 0)
+ {
+ this.subTxSize = i;
+ this.pubTxSize = i;
+ }
+ }
+
+ else if (arg == "--durable")
+ {
+ arg = args[++current];
+ if (arg.Equals("yes"))
+ {
+ this.durable = true;
+ }
+ }
+
+ else if (arg == "--protocol")
+ {
+ arg = args[++current];
+ if (arg.Equals("ssl"))
+ {
+ this.ssl = true;
+ }
+ }
+
+ else if (arg == "--username")
+ {
+ this.username = args[++current];
+ }
+
+ else if (arg == "--password")
+ {
+ this.password = args[++current];
+ }
+
+ else if (arg == "--mechanism")
+ {
+ arg = args[++current];
+ if (arg.Equals("PLAIN", StringComparison.OrdinalIgnoreCase))
+ {
+ this.saslMechanism = SaslMechanism.Plain;
+ }
+ }
+
+ else
+ {
+ throw new ArgumentException(String.Format("unknown argument \"{0}\"", arg));
+ }
+
+ current++;
+ }
+
+ if (this.saslMechanism == SaslMechanism.Plain)
+ {
+ // use guest/guest as defaults if neither is specified
+ if ((this.username == null) && (this.password == null))
+ {
+ this.username = "guest";
+ this.password = "guest";
+ }
+ else
+ {
+ if (this.username == null)
+ {
+ this.username = "";
+ }
+ if (this.password == null)
+ {
+ this.password = "";
+ }
+ }
+ }
+
+ }
+ }
+
+
+ public class Client
+ {
+ protected Options opts;
+
+ public static void Expect(string actual, string expect)
+ {
+ if (expect != actual)
+ {
+ throw new Exception("Expecting " + expect + " but received " + actual);
+ }
+ }
+
+ public static void Close(IChannel channel)
+ {
+ if (channel == null)
+ {
+ return;
+ }
+
+ try
+ {
+ channel.Close();
+ }
+ catch (Exception e)
+ {
+ Console.WriteLine("channel close exception {0}", e);
+ }
+ }
+
+ public string Fqn(string name)
+ {
+ return opts.baseName + '_' + name;
+ }
+ }
+
+
+ public class WcfPerftest
+ {
+ static void WarmUpTransactionSubsystem(Options opts)
+ {
+ // see if any use of transactions is expected
+ if ((opts.type == ClientType.Publisher) && (opts.pubTxSize == 0))
+ return;
+
+ if ((opts.type == ClientType.Subscriber) && (opts.subTxSize == 0))
+ return;
+
+ if (opts.type == ClientType.InteropDemo)
+ {
+ if ((opts.subTxSize == 0) && (opts.pubTxSize == 0))
+ return;
+ }
+
+ Console.WriteLine("Initializing transactions");
+ IRawBodyUtility bodyUtil = new RawEncoderUtility();
+
+ // Send a transacted message to nowhere to force the initial registration with MSDTC.
+ // MSDTC insists on verifying it can contact the resource in the manner expected for
+ // recovery. This requires setting up and finishing a separate connection to the
+ // broker by a thread owned by the DTC. Excluding this time allows the existing
+ // reporting mechanisms to better reflect the cost per transaction without requiring
+ // long test runs.
+ IOutputChannel channel = QueueChannelFactory.CreateWriterChannel("amq.direct", Guid.NewGuid().ToString());
+ Message msg = bodyUtil.CreateMessage("sacrificial transacted message from WcfPerftest");
+ using (TransactionScope ts = new TransactionScope())
+ {
+ channel.Send(msg);
+ // abort/rollback
+ ts.Dispose();
+ }
+ channel.Close();
+ Console.WriteLine("transaction resource manager ready");
+ }
+
+
+ // demonstrate message exchange between WcfPerftest.exe and native
+ // C++ perftest.exe
+
+ static void InteropDemo(Options opts)
+ {
+ string perftest_cpp_exe = "qpid-perftest.exe";
+ string commonArgs = String.Format(" --count {0} --size {1} --broker {2} --port {3}", opts.messageCount, opts.messageSize, opts.broker, opts.port);
+
+ if (opts.durable)
+ {
+ commonArgs += " --durable yes";
+ }
+
+ if (opts.ssl)
+ {
+ commonArgs += " --protocol ssl";
+ }
+
+ if (opts.saslMechanism == SaslMechanism.Plain)
+ {
+ commonArgs += String.Format(" --username {0} --password {1} --mechanism PLAIN", opts.username, opts.password);
+ }
+
+ Console.WriteLine("===== WCF Subscriber and C++ Publisher =====");
+
+ Process setup = new Process();
+ setup.StartInfo.FileName = perftest_cpp_exe;
+ setup.StartInfo.UseShellExecute = false;
+ setup.StartInfo.Arguments = "--setup" + commonArgs;
+ try
+ {
+ setup.Start();
+ }
+ catch (Win32Exception win32e)
+ {
+ Console.WriteLine("Cannot execute {0}: PATH not set?", perftest_cpp_exe);
+ Console.WriteLine(" Error: {0}", win32e.Message);
+ return;
+ }
+ setup.WaitForExit();
+
+ Process control = new Process();
+ control.StartInfo.FileName = perftest_cpp_exe;
+ control.StartInfo.UseShellExecute = false;
+ control.StartInfo.Arguments = "--control" + commonArgs;
+ control.Start();
+
+ Process publish = new Process();
+ publish.StartInfo.FileName = perftest_cpp_exe;
+ publish.StartInfo.UseShellExecute = false;
+ publish.StartInfo.Arguments = "--publish" + commonArgs;
+ publish.Start();
+
+ SubscribeThread subscribeWcf = new SubscribeThread(opts.baseName + "0", opts);
+ Thread subThread = new Thread(subscribeWcf.Run);
+ subThread.Start();
+
+ subThread.Join();
+ publish.WaitForExit();
+ control.WaitForExit();
+
+ Console.WriteLine();
+ Console.WriteLine("===== WCF Publisher and C++ Subscriber =====");
+
+ setup = new Process();
+ setup.StartInfo.FileName = perftest_cpp_exe;
+ setup.StartInfo.UseShellExecute = false;
+ setup.StartInfo.Arguments = "--setup" + commonArgs;
+ setup.Start();
+ setup.WaitForExit();
+
+ control = new Process();
+ control.StartInfo.FileName = perftest_cpp_exe;
+ control.StartInfo.UseShellExecute = false;
+ control.StartInfo.Arguments = "--control" + commonArgs;
+ control.Start();
+
+ PublishThread pub = new PublishThread(opts.baseName + "0", "", opts);
+ Thread pubThread = new Thread(pub.Run);
+ pubThread.Start();
+
+ Process subscribeCpp = new Process();
+ subscribeCpp.StartInfo.FileName = perftest_cpp_exe;
+ subscribeCpp.StartInfo.UseShellExecute = false;
+ subscribeCpp.StartInfo.Arguments = "--subscribe" + commonArgs;
+ subscribeCpp.Start();
+
+ subscribeCpp.WaitForExit();
+ pubThread.Join();
+ control.WaitForExit();
+ }
+
+ static void Main(string[] mainArgs)
+ {
+ Options opts = new Options();
+ opts.Parse(mainArgs);
+ QueueChannelFactory.SetBroker(opts.broker, opts.port);
+ QueueChannelFactory.SetSecurity(opts.ssl, opts.username, opts.password);
+
+ WarmUpTransactionSubsystem(opts);
+
+ if (opts.type == ClientType.Publisher)
+ {
+ PublishThread pub = new PublishThread(opts.baseName + "0", "", opts);
+ Thread pubThread = new Thread(pub.Run);
+ pubThread.Start();
+ pubThread.Join();
+ }
+ else if (opts.type == ClientType.Subscriber)
+ {
+ SubscribeThread sub = new SubscribeThread(opts.baseName + "0", opts);
+ Thread subThread = new Thread(sub.Run);
+ subThread.Start();
+ subThread.Join();
+ }
+ else
+ {
+ InteropDemo(opts);
+ }
+
+ if (System.Diagnostics.Debugger.IsAttached)
+ {
+ Console.WriteLine("Hit return to continue...");
+ Console.ReadLine();
+ }
+ }
+ }
+
+ public class PublishThread : Client
+ {
+ string destination; // exchange/queue
+ string routingKey;
+ int msgSize;
+ UInt64 msgCount;
+ IOutputChannel publishQueue;
+
+ public PublishThread(string key, string q, Options opts)
+ {
+ this.routingKey = key;
+ this.destination = q;
+ this.msgSize = opts.messageSize;
+ this.msgCount = opts.messageCount;
+ this.opts = opts;
+ }
+
+ static void StampSequenceNo(byte[] data, UInt64 n)
+ {
+ int wordLen = IntPtr.Size; // mimic size_t in C++
+
+ if (data.Length < wordLen)
+ throw new ArgumentException("message size");
+ for (int i = 0; i < wordLen; i++)
+ {
+ data[i] = (byte) (n & 0xff);
+ n >>= 8;
+ }
+ }
+
+ public void Run()
+ {
+ IRawBodyUtility bodyUtil = new RawEncoderUtility();
+
+ IInputChannel startQueue = null;
+ IOutputChannel doneQueue = null;
+ UInt64 batchSize = (UInt64)opts.pubTxSize;
+ bool txPending = false;
+ AmqpProperties amqpProperties = null;
+
+ if (opts.durable)
+ {
+ amqpProperties = new AmqpProperties();
+ amqpProperties.Durable = true;
+ }
+
+ try
+ {
+ publishQueue = QueueChannelFactory.CreateWriterChannel(this.destination, this.routingKey);
+ doneQueue = QueueChannelFactory.CreateWriterChannel("", this.Fqn("pub_done"));
+ startQueue = QueueChannelFactory.CreateReaderChannel(this.Fqn("pub_start"));
+
+ // wait for our start signal
+ Message msg;
+ msg = startQueue.Receive(TimeSpan.MaxValue);
+ Expect(bodyUtil.GetText(msg), "start");
+ msg.Close();
+
+ Stopwatch stopwatch = new Stopwatch();
+ AsyncCallback sendCallback = new AsyncCallback(this.AsyncSendCB);
+
+ byte[] data = new byte[this.msgSize];
+ IAsyncResult sendResult = null;
+
+ Console.WriteLine("sending {0}", this.msgCount);
+ stopwatch.Start();
+
+ if (batchSize > 0)
+ {
+ Transaction.Current = new CommittableTransaction();
+ }
+
+ for (UInt64 i = 0; i < this.msgCount; i++)
+ {
+ StampSequenceNo(data, i);
+ msg = bodyUtil.CreateMessage(data);
+ if (amqpProperties != null)
+ {
+ msg.Properties.Add("AmqpProperties", amqpProperties);
+ }
+
+ sendResult = publishQueue.BeginSend(msg, TimeSpan.MaxValue, sendCallback, msg);
+
+ if (batchSize > 0)
+ {
+ txPending = true;
+ if (((i + 1) % batchSize) == 0)
+ {
+ ((CommittableTransaction)Transaction.Current).Commit();
+ txPending = false;
+ Transaction.Current = new CommittableTransaction();
+ }
+ }
+ }
+
+ if (txPending)
+ {
+ ((CommittableTransaction)Transaction.Current).Commit();
+ }
+
+ Transaction.Current = null;
+
+ sendResult.AsyncWaitHandle.WaitOne();
+ stopwatch.Stop();
+
+ double mps = (msgCount / stopwatch.Elapsed.TotalSeconds);
+
+ msg = bodyUtil.CreateMessage(String.Format("{0:0.##}", mps));
+ doneQueue.Send(msg, TimeSpan.MaxValue);
+ msg.Close();
+ }
+ finally
+ {
+ Close((IChannel)doneQueue);
+ Close((IChannel)publishQueue);
+ Close(startQueue);
+ }
+ }
+
+ void AsyncSendCB(IAsyncResult result)
+ {
+ publishQueue.EndSend(result);
+ ((Message)result.AsyncState).Close();
+ }
+ }
+
+ public class SubscribeThread : Client
+ {
+ string queue;
+ int msgSize;
+ UInt64 msgCount;
+ IInputChannel subscribeQueue;
+
+ public SubscribeThread(string q, Options opts)
+ {
+ this.queue = q;
+ this.msgSize = opts.messageSize;
+ this.msgCount = opts.messageCount;
+ this.opts = opts;
+ }
+
+ static UInt64 GetSequenceNumber(byte[] data)
+ {
+ int wordLen = IntPtr.Size; // mimic size_t in C++
+
+ if (data.Length < wordLen)
+ throw new ArgumentException("message size");
+ UInt64 n = 0;
+ for (int i = (wordLen - 1); i >= 0; i--)
+ {
+ n = (256 * n) + data[i];
+ }
+ return n;
+ }
+
+ public void Run()
+ {
+ IRawBodyUtility bodyUtil = new RawEncoderUtility();
+
+ IOutputChannel readyQueue = null;
+ IOutputChannel doneQueue = null;
+ UInt64 batchSize = (UInt64)opts.subTxSize;
+ bool txPending = false;
+ byte[] data = null;
+
+ try
+ {
+ this.subscribeQueue = QueueChannelFactory.CreateReaderChannel(this.queue);
+ readyQueue = QueueChannelFactory.CreateWriterChannel("", this.Fqn("sub_ready"));
+ doneQueue = QueueChannelFactory.CreateWriterChannel("", this.Fqn("sub_done"));
+
+ Message msg = bodyUtil.CreateMessage("ready");
+ readyQueue.Send(msg, TimeSpan.MaxValue);
+ msg.Close();
+
+
+ Stopwatch stopwatch = new Stopwatch();
+ stopwatch.Start();
+
+ Console.WriteLine("receiving {0}", this.msgCount);
+ UInt64 expect = 0;
+
+ if (batchSize > 0)
+ {
+ Transaction.Current = new CommittableTransaction();
+ }
+
+ for (UInt64 i = 0; i < this.msgCount; i++)
+ {
+ msg = subscribeQueue.Receive(TimeSpan.MaxValue);
+
+ data = bodyUtil.GetBytes(msg, data);
+ msg.Close();
+ if (data.Length != this.msgSize)
+ {
+ throw new Exception("subscribe message size mismatch");
+ }
+
+ UInt64 n = GetSequenceNumber(data);
+ if (n != expect)
+ {
+ throw new Exception(String.Format("message sequence error. expected {0} got {1}", expect, n));
+ }
+ expect = n + 1;
+
+ if (batchSize > 0)
+ {
+ txPending = true;
+ if (((i + 1) % batchSize) == 0)
+ {
+ ((CommittableTransaction)Transaction.Current).Commit();
+ txPending = false;
+ Transaction.Current = new CommittableTransaction();
+ }
+ }
+ }
+
+ if (txPending)
+ {
+ ((CommittableTransaction)Transaction.Current).Commit();
+ }
+
+ Transaction.Current = null;
+
+ stopwatch.Stop();
+
+ double mps = (msgCount / stopwatch.Elapsed.TotalSeconds);
+
+ msg = bodyUtil.CreateMessage(String.Format("{0:0.##}", mps));
+ doneQueue.Send(msg, TimeSpan.MaxValue);
+ msg.Close();
+
+ subscribeQueue.Close();
+ }
+ finally
+ {
+ Close((IChannel)doneQueue);
+ Close((IChannel)this.subscribeQueue);
+ Close(readyQueue);
+ }
+ }
+ }
+}
diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/WcfPerftest.csproj b/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/WcfPerftest.csproj
new file mode 100644
index 0000000000..44ef998a24
--- /dev/null
+++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/WcfPerftest.csproj
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Project ToolsVersion="3.5" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProductVersion>9.0.21022</ProductVersion>
+ <SchemaVersion>2.0</SchemaVersion>
+ <ProjectGuid>{D0F8FDE4-7AC6-4CFF-986A-50D06F7FD733}</ProjectGuid>
+ <OutputType>Exe</OutputType>
+ <RootNamespace>Apache.Qpid.Test.Channel.WcfPerftest</RootNamespace>
+ <AssemblyName>WcfPerftest</AssemblyName>
+ <TargetFrameworkVersion>v3.5</TargetFrameworkVersion>
+ <FileAlignment>512</FileAlignment>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>bin\Debug\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>bin\Release\</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="System" />
+ <Reference Include="System.Runtime.Serialization">
+ <RequiredTargetFramework>3.0</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.ServiceModel">
+ <RequiredTargetFramework>3.0</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Transactions" />
+ <Reference Include="System.XML" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="WcfPerftest.cs" />
+ <Compile Include="RawBodyUtility.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="..\..\..\..\..\..\src\Apache\Qpid\Channel\Channel.csproj">
+ <Project>{8AABAB30-7D1E-4539-B7D1-05450262BAD2}</Project>
+ <Name>Channel</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\..\..\..\..\..\src\Apache\Qpid\Interop\Interop.vcproj">
+ <Project>{C9B6AC75-6332-47A4-B82B-0C20E0AF2D34}</Project>
+ <Name>Interop</Name>
+ </ProjectReference>
+ </ItemGroup>
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+</Project>