diff options
Diffstat (limited to 'qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/AsyncTest.cs')
-rw-r--r-- | qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/AsyncTest.cs | 190 |
1 files changed, 190 insertions, 0 deletions
diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/AsyncTest.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/AsyncTest.cs new file mode 100644 index 0000000000..23bed6c603 --- /dev/null +++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/AsyncTest.cs @@ -0,0 +1,190 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Test.Channel.Functional +{ + using System; + using System.ServiceModel; + using System.ServiceModel.Channels; + using System.Threading; + using NUnit.Framework; + + [TestFixture] + public class AsyncTest + { + private const int MessageCount = 20; + private const string Queue = "amqp:amq.direct?routingkey=routing_key"; + private Uri endpoint = new Uri("amqp:message_queue"); + private TimeSpan standardTimeout = TimeSpan.FromSeconds(10.0); // seconds + + [Test] + public void NonTryReceives() + { + this.SendMessages(this.standardTimeout, this.standardTimeout); + this.ReceiveNonTryMessages(this.standardTimeout, this.standardTimeout); + } + + [Test] + public void TryReceives() + { + this.SendMessages(this.standardTimeout, this.standardTimeout); + this.ReceiveTryMessages(this.standardTimeout, this.standardTimeout); + } + + [Test] + public void SmallTimeout() + { + // This code is commented out due to a bug in asynchronous channel open. + ////IChannelListener parentListener; + ////try + ////{ + //// this.RetrieveAsyncChannel(new Uri("amqp:fake_queue_do_not_create"), TimeSpan.FromMilliseconds(10.0), out parentListener); + //// parentListener.Close(); + //// Assert.Fail("Accepting the channel did not time out."); + ////} + ////catch (TimeoutException) + ////{ + //// // Intended exception. + ////} + + try + { + this.ReceiveNonTryMessages(this.standardTimeout, TimeSpan.FromMilliseconds(10.0)); + Assert.Fail("Receiving a message did not time out."); + } + catch (TimeoutException) + { + // Intended exception. + } + } + + private void SendMessages(TimeSpan channelTimeout, TimeSpan messageSendTimeout) + { + ChannelFactory<IOutputChannel> channelFactory = + new ChannelFactory<IOutputChannel>(Util.GetBinding(), Queue); + IOutputChannel proxy = channelFactory.CreateChannel(); + IAsyncResult[] resultArray = new IAsyncResult[MessageCount]; + + for (int i = 0; i < MessageCount; i++) + { + Message toSend = Message.CreateMessage(MessageVersion.Default, string.Empty, i); + resultArray[i] = proxy.BeginSend(toSend, messageSendTimeout, null, null); + } + + for (int j = 0; j < MessageCount; j++) + { + proxy.EndSend(resultArray[j]); + } + + IAsyncResult iocCloseResult = proxy.BeginClose(channelTimeout, null, null); + Thread.Sleep(TimeSpan.FromMilliseconds(50.0)); // Dummy work + proxy.EndClose(iocCloseResult); + + IAsyncResult chanFactCloseResult = channelFactory.BeginClose(channelTimeout, null, null); + Thread.Sleep(TimeSpan.FromMilliseconds(50.0)); // Dummy work + channelFactory.EndClose(chanFactCloseResult); + } + + private void ReceiveNonTryMessages(TimeSpan channelTimeout, TimeSpan messageTimeout) + { + IChannelListener inputChannelParentListener; + IInputChannel inputChannel = this.RetrieveAsyncChannel(this.endpoint, channelTimeout, out inputChannelParentListener); + + inputChannel.Open(); + + IAsyncResult[] resultArray = new IAsyncResult[MessageCount]; + try + { + for (int i = 0; i < MessageCount; i++) + { + resultArray[i] = inputChannel.BeginReceive(messageTimeout, null, null); + } + + for (int j = 0; j < MessageCount; j++) + { + inputChannel.EndReceive(resultArray[j]); + } + } + finally + { + IAsyncResult channelCloseResult = inputChannel.BeginClose(channelTimeout, null, null); + Thread.Sleep(TimeSpan.FromMilliseconds(50.0)); // Dummy work + inputChannel.EndClose(channelCloseResult); + + // Asynchronous listener close has not been implemented. + ////IAsyncResult listenerCloseResult = inputChannelParentListener.BeginClose(channelTimeout, null, null); + ////Thread.Sleep(TimeSpan.FromMilliseconds(50.0)); // Dummy work + ////inputChannelParentListener.EndClose(listenerCloseResult); + + inputChannelParentListener.Close(); + } + } + + private void ReceiveTryMessages(TimeSpan channelAcceptTimeout, TimeSpan messageReceiveTimeout) + { + IChannelListener<IInputChannel> listener = Util.GetBinding().BuildChannelListener<IInputChannel>(this.endpoint, new BindingParameterCollection()); + listener.Open(); + IInputChannel inputChannel = listener.AcceptChannel(channelAcceptTimeout); + IAsyncResult channelResult = inputChannel.BeginOpen(channelAcceptTimeout, null, null); + Thread.Sleep(TimeSpan.FromMilliseconds(50.0)); + inputChannel.EndOpen(channelResult); + + IAsyncResult[] resultArray = new IAsyncResult[MessageCount]; + + for (int i = 0; i < MessageCount; i++) + { + resultArray[i] = inputChannel.BeginTryReceive(messageReceiveTimeout, null, null); + } + + for (int j = 0; j < MessageCount; j++) + { + Message tempMessage; + Assert.True(inputChannel.EndTryReceive(resultArray[j], out tempMessage), "Did not successfully receive message #{0}", j); + } + + inputChannel.Close(); + listener.Close(); + } + + private IInputChannel RetrieveAsyncChannel(Uri queue, TimeSpan timeout, out IChannelListener parentListener) + { + IChannelListener<IInputChannel> listener = + Util.GetBinding().BuildChannelListener<IInputChannel>(queue, new BindingParameterCollection()); + listener.Open(); + IInputChannel inputChannel; + + try + { + IAsyncResult acceptResult = listener.BeginAcceptChannel(timeout, null, null); + Thread.Sleep(TimeSpan.FromMilliseconds(300.0)); // Dummy work + inputChannel = listener.EndAcceptChannel(acceptResult); + } + catch (TimeoutException) + { + listener.Close(); + throw; + } + finally + { + parentListener = listener; + } + return inputChannel; + } + } +} |