/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ namespace Apache.Qpid.Test.Channel.Functional { using System; using System.ServiceModel; using System.ServiceModel.Channels; using System.Threading; using NUnit.Framework; [TestFixture] public class AsyncTest { private const int MessageCount = 20; private const string Queue = "amqp:amq.direct?routingkey=routing_key"; private Uri endpoint = new Uri("amqp:message_queue"); private TimeSpan standardTimeout = TimeSpan.FromSeconds(10.0); // seconds [Test] public void NonTryReceives() { this.SendMessages(this.standardTimeout, this.standardTimeout); this.ReceiveNonTryMessages(this.standardTimeout, this.standardTimeout); } [Test] public void TryReceives() { this.SendMessages(this.standardTimeout, this.standardTimeout); this.ReceiveTryMessages(this.standardTimeout, this.standardTimeout); } [Test] public void SmallTimeout() { // This code is commented out due to a bug in asynchronous channel open. ////IChannelListener parentListener; ////try ////{ //// this.RetrieveAsyncChannel(new Uri("amqp:fake_queue_do_not_create"), TimeSpan.FromMilliseconds(10.0), out parentListener); //// parentListener.Close(); //// Assert.Fail("Accepting the channel did not time out."); ////} ////catch (TimeoutException) ////{ //// // Intended exception. ////} try { this.ReceiveNonTryMessages(this.standardTimeout, TimeSpan.FromMilliseconds(10.0)); Assert.Fail("Receiving a message did not time out."); } catch (TimeoutException) { // Intended exception. } } private void SendMessages(TimeSpan channelTimeout, TimeSpan messageSendTimeout) { ChannelFactory channelFactory = new ChannelFactory(Util.GetBinding(), Queue); IOutputChannel proxy = channelFactory.CreateChannel(); IAsyncResult[] resultArray = new IAsyncResult[MessageCount]; for (int i = 0; i < MessageCount; i++) { Message toSend = Message.CreateMessage(MessageVersion.Default, string.Empty, i); resultArray[i] = proxy.BeginSend(toSend, messageSendTimeout, null, null); } for (int j = 0; j < MessageCount; j++) { proxy.EndSend(resultArray[j]); } IAsyncResult iocCloseResult = proxy.BeginClose(channelTimeout, null, null); Thread.Sleep(TimeSpan.FromMilliseconds(50.0)); // Dummy work proxy.EndClose(iocCloseResult); IAsyncResult chanFactCloseResult = channelFactory.BeginClose(channelTimeout, null, null); Thread.Sleep(TimeSpan.FromMilliseconds(50.0)); // Dummy work channelFactory.EndClose(chanFactCloseResult); } private void ReceiveNonTryMessages(TimeSpan channelTimeout, TimeSpan messageTimeout) { IChannelListener inputChannelParentListener; IInputChannel inputChannel = this.RetrieveAsyncChannel(this.endpoint, channelTimeout, out inputChannelParentListener); inputChannel.Open(); IAsyncResult[] resultArray = new IAsyncResult[MessageCount]; try { for (int i = 0; i < MessageCount; i++) { resultArray[i] = inputChannel.BeginReceive(messageTimeout, null, null); } for (int j = 0; j < MessageCount; j++) { inputChannel.EndReceive(resultArray[j]); } } finally { IAsyncResult channelCloseResult = inputChannel.BeginClose(channelTimeout, null, null); Thread.Sleep(TimeSpan.FromMilliseconds(50.0)); // Dummy work inputChannel.EndClose(channelCloseResult); // Asynchronous listener close has not been implemented. ////IAsyncResult listenerCloseResult = inputChannelParentListener.BeginClose(channelTimeout, null, null); ////Thread.Sleep(TimeSpan.FromMilliseconds(50.0)); // Dummy work ////inputChannelParentListener.EndClose(listenerCloseResult); inputChannelParentListener.Close(); } } private void ReceiveTryMessages(TimeSpan channelAcceptTimeout, TimeSpan messageReceiveTimeout) { IChannelListener listener = Util.GetBinding().BuildChannelListener(this.endpoint, new BindingParameterCollection()); listener.Open(); IInputChannel inputChannel = listener.AcceptChannel(channelAcceptTimeout); IAsyncResult channelResult = inputChannel.BeginOpen(channelAcceptTimeout, null, null); Thread.Sleep(TimeSpan.FromMilliseconds(50.0)); inputChannel.EndOpen(channelResult); IAsyncResult[] resultArray = new IAsyncResult[MessageCount]; for (int i = 0; i < MessageCount; i++) { resultArray[i] = inputChannel.BeginTryReceive(messageReceiveTimeout, null, null); } for (int j = 0; j < MessageCount; j++) { Message tempMessage; Assert.True(inputChannel.EndTryReceive(resultArray[j], out tempMessage), "Did not successfully receive message #{0}", j); } inputChannel.Close(); listener.Close(); } private IInputChannel RetrieveAsyncChannel(Uri queue, TimeSpan timeout, out IChannelListener parentListener) { IChannelListener listener = Util.GetBinding().BuildChannelListener(queue, new BindingParameterCollection()); listener.Open(); IInputChannel inputChannel; try { IAsyncResult acceptResult = listener.BeginAcceptChannel(timeout, null, null); Thread.Sleep(TimeSpan.FromMilliseconds(300.0)); // Dummy work inputChannel = listener.EndAcceptChannel(acceptResult); } catch (TimeoutException) { listener.Close(); throw; } finally { parentListener = listener; } return inputChannel; } } }