summaryrefslogtreecommitdiff
path: root/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/AsyncTest.cs
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/AsyncTest.cs')
-rw-r--r--qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/AsyncTest.cs190
1 files changed, 190 insertions, 0 deletions
diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/AsyncTest.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/AsyncTest.cs
new file mode 100644
index 0000000000..23bed6c603
--- /dev/null
+++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/AsyncTest.cs
@@ -0,0 +1,190 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System;
+ using System.ServiceModel;
+ using System.ServiceModel.Channels;
+ using System.Threading;
+ using NUnit.Framework;
+
+ [TestFixture]
+ public class AsyncTest
+ {
+ private const int MessageCount = 20;
+ private const string Queue = "amqp:amq.direct?routingkey=routing_key";
+ private Uri endpoint = new Uri("amqp:message_queue");
+ private TimeSpan standardTimeout = TimeSpan.FromSeconds(10.0); // seconds
+
+ [Test]
+ public void NonTryReceives()
+ {
+ this.SendMessages(this.standardTimeout, this.standardTimeout);
+ this.ReceiveNonTryMessages(this.standardTimeout, this.standardTimeout);
+ }
+
+ [Test]
+ public void TryReceives()
+ {
+ this.SendMessages(this.standardTimeout, this.standardTimeout);
+ this.ReceiveTryMessages(this.standardTimeout, this.standardTimeout);
+ }
+
+ [Test]
+ public void SmallTimeout()
+ {
+ // This code is commented out due to a bug in asynchronous channel open.
+ ////IChannelListener parentListener;
+ ////try
+ ////{
+ //// this.RetrieveAsyncChannel(new Uri("amqp:fake_queue_do_not_create"), TimeSpan.FromMilliseconds(10.0), out parentListener);
+ //// parentListener.Close();
+ //// Assert.Fail("Accepting the channel did not time out.");
+ ////}
+ ////catch (TimeoutException)
+ ////{
+ //// // Intended exception.
+ ////}
+
+ try
+ {
+ this.ReceiveNonTryMessages(this.standardTimeout, TimeSpan.FromMilliseconds(10.0));
+ Assert.Fail("Receiving a message did not time out.");
+ }
+ catch (TimeoutException)
+ {
+ // Intended exception.
+ }
+ }
+
+ private void SendMessages(TimeSpan channelTimeout, TimeSpan messageSendTimeout)
+ {
+ ChannelFactory<IOutputChannel> channelFactory =
+ new ChannelFactory<IOutputChannel>(Util.GetBinding(), Queue);
+ IOutputChannel proxy = channelFactory.CreateChannel();
+ IAsyncResult[] resultArray = new IAsyncResult[MessageCount];
+
+ for (int i = 0; i < MessageCount; i++)
+ {
+ Message toSend = Message.CreateMessage(MessageVersion.Default, string.Empty, i);
+ resultArray[i] = proxy.BeginSend(toSend, messageSendTimeout, null, null);
+ }
+
+ for (int j = 0; j < MessageCount; j++)
+ {
+ proxy.EndSend(resultArray[j]);
+ }
+
+ IAsyncResult iocCloseResult = proxy.BeginClose(channelTimeout, null, null);
+ Thread.Sleep(TimeSpan.FromMilliseconds(50.0)); // Dummy work
+ proxy.EndClose(iocCloseResult);
+
+ IAsyncResult chanFactCloseResult = channelFactory.BeginClose(channelTimeout, null, null);
+ Thread.Sleep(TimeSpan.FromMilliseconds(50.0)); // Dummy work
+ channelFactory.EndClose(chanFactCloseResult);
+ }
+
+ private void ReceiveNonTryMessages(TimeSpan channelTimeout, TimeSpan messageTimeout)
+ {
+ IChannelListener inputChannelParentListener;
+ IInputChannel inputChannel = this.RetrieveAsyncChannel(this.endpoint, channelTimeout, out inputChannelParentListener);
+
+ inputChannel.Open();
+
+ IAsyncResult[] resultArray = new IAsyncResult[MessageCount];
+ try
+ {
+ for (int i = 0; i < MessageCount; i++)
+ {
+ resultArray[i] = inputChannel.BeginReceive(messageTimeout, null, null);
+ }
+
+ for (int j = 0; j < MessageCount; j++)
+ {
+ inputChannel.EndReceive(resultArray[j]);
+ }
+ }
+ finally
+ {
+ IAsyncResult channelCloseResult = inputChannel.BeginClose(channelTimeout, null, null);
+ Thread.Sleep(TimeSpan.FromMilliseconds(50.0)); // Dummy work
+ inputChannel.EndClose(channelCloseResult);
+
+ // Asynchronous listener close has not been implemented.
+ ////IAsyncResult listenerCloseResult = inputChannelParentListener.BeginClose(channelTimeout, null, null);
+ ////Thread.Sleep(TimeSpan.FromMilliseconds(50.0)); // Dummy work
+ ////inputChannelParentListener.EndClose(listenerCloseResult);
+
+ inputChannelParentListener.Close();
+ }
+ }
+
+ private void ReceiveTryMessages(TimeSpan channelAcceptTimeout, TimeSpan messageReceiveTimeout)
+ {
+ IChannelListener<IInputChannel> listener = Util.GetBinding().BuildChannelListener<IInputChannel>(this.endpoint, new BindingParameterCollection());
+ listener.Open();
+ IInputChannel inputChannel = listener.AcceptChannel(channelAcceptTimeout);
+ IAsyncResult channelResult = inputChannel.BeginOpen(channelAcceptTimeout, null, null);
+ Thread.Sleep(TimeSpan.FromMilliseconds(50.0));
+ inputChannel.EndOpen(channelResult);
+
+ IAsyncResult[] resultArray = new IAsyncResult[MessageCount];
+
+ for (int i = 0; i < MessageCount; i++)
+ {
+ resultArray[i] = inputChannel.BeginTryReceive(messageReceiveTimeout, null, null);
+ }
+
+ for (int j = 0; j < MessageCount; j++)
+ {
+ Message tempMessage;
+ Assert.True(inputChannel.EndTryReceive(resultArray[j], out tempMessage), "Did not successfully receive message #{0}", j);
+ }
+
+ inputChannel.Close();
+ listener.Close();
+ }
+
+ private IInputChannel RetrieveAsyncChannel(Uri queue, TimeSpan timeout, out IChannelListener parentListener)
+ {
+ IChannelListener<IInputChannel> listener =
+ Util.GetBinding().BuildChannelListener<IInputChannel>(queue, new BindingParameterCollection());
+ listener.Open();
+ IInputChannel inputChannel;
+
+ try
+ {
+ IAsyncResult acceptResult = listener.BeginAcceptChannel(timeout, null, null);
+ Thread.Sleep(TimeSpan.FromMilliseconds(300.0)); // Dummy work
+ inputChannel = listener.EndAcceptChannel(acceptResult);
+ }
+ catch (TimeoutException)
+ {
+ listener.Close();
+ throw;
+ }
+ finally
+ {
+ parentListener = listener;
+ }
+ return inputChannel;
+ }
+ }
+}