summaryrefslogtreecommitdiff
path: root/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelReceiver.cs
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelReceiver.cs')
-rw-r--r--qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelReceiver.cs280
1 files changed, 280 insertions, 0 deletions
diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelReceiver.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelReceiver.cs
new file mode 100644
index 0000000000..20af98fa64
--- /dev/null
+++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelReceiver.cs
@@ -0,0 +1,280 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System;
+ using System.ServiceModel;
+ using System.ServiceModel.Channels;
+ using System.Threading;
+ using System.Transactions;
+
+ public class ChannelReceiver : ChannelEntity
+ {
+ public ChannelReceiver(ChannelContextParameters contextParameters, Binding channelBinding)
+ : base(contextParameters, channelBinding)
+ {
+ }
+
+ public override void Run(string listenUri)
+ {
+ IChannelListener<IInputChannel> listener = this.Binding.BuildChannelListener<IInputChannel>(new Uri(listenUri));
+ listener.Open();
+
+ if (this.Parameters.WaitForChannel)
+ {
+ this.WaitForChannel(listener, this.Parameters.AsyncWaitForChannel, this.Parameters.WaitForChannelTimeout);
+ }
+
+ this.AcceptChannelAndReceive(listener);
+
+ if (listener.State != CommunicationState.Closed)
+ {
+ listener.Close();
+ }
+ }
+
+ private void AcceptChannelAndReceive(IChannelListener<IInputChannel> listener)
+ {
+ IInputChannel channel;
+ TransactionScope transactionToAbortOnAccept = null;
+
+ if (this.Parameters.AbortTxDatagramAccept)
+ {
+ transactionToAbortOnAccept = new TransactionScope(TransactionScopeOption.RequiresNew);
+ }
+
+ if (this.Parameters.AsyncAccept)
+ {
+ IAsyncResult result = listener.BeginAcceptChannel(null, null);
+ channel = listener.EndAcceptChannel(result);
+ }
+ else
+ {
+ channel = listener.AcceptChannel();
+ }
+
+ if (this.Parameters.AbortTxDatagramAccept)
+ {
+ transactionToAbortOnAccept.Dispose();
+ }
+
+ channel.Open();
+ Message message;
+
+ if (this.Parameters.CloseListenerEarly)
+ {
+ listener.Close();
+ }
+
+ try
+ {
+ using (TransactionScope ts = new TransactionScope(TransactionScopeOption.RequiresNew))
+ {
+ Message firstMessage = channel.Receive(this.Parameters.ReceiveTimeout);
+
+ lock (this.Results)
+ {
+ this.Results.Add(String.Format("Received message with Action '{0}'", firstMessage.Headers.Action));
+ }
+
+ ts.Complete();
+ }
+ }
+ catch (TimeoutException)
+ {
+ lock (this.Results)
+ {
+ this.Results.Add("Receive timed out.");
+ }
+
+ channel.Abort();
+ return;
+ }
+
+ AutoResetEvent doneReceiving = new AutoResetEvent(false);
+ int threadsCompleted = 0;
+
+ for (int i = 0; i < this.Parameters.NumberOfThreads; ++i)
+ {
+ ThreadPool.QueueUserWorkItem(new WaitCallback(delegate(object unused)
+ {
+ do
+ {
+ if (this.Parameters.ReceiverShouldAbort)
+ {
+ this.ReceiveMessage(channel, false);
+ Thread.Sleep(200);
+ }
+
+ message = this.ReceiveMessage(channel, true);
+ }
+ while (message != null);
+
+ if (Interlocked.Increment(ref threadsCompleted) == this.Parameters.NumberOfThreads)
+ {
+ doneReceiving.Set();
+ }
+ }));
+ }
+
+ TimeSpan threadTimeout = TimeSpan.FromMinutes(2.0);
+ if (!doneReceiving.WaitOne(threadTimeout, false))
+ {
+ this.Results.Add(String.Format("Threads did not complete within {0}.", threadTimeout));
+ }
+
+ channel.Close();
+ }
+
+ private Message ReceiveMessage(IInputChannel channel, bool commit)
+ {
+ Message message = null;
+
+ using (TransactionScope ts = new TransactionScope(TransactionScopeOption.Required))
+ {
+ bool messageDetected = false;
+ if (this.Parameters.AsyncWaitForMessage)
+ {
+ IAsyncResult result = channel.BeginWaitForMessage(this.Parameters.WaitForMessageTimeout, null, null);
+ messageDetected = channel.EndWaitForMessage(result);
+ }
+ else
+ {
+ messageDetected = channel.WaitForMessage(this.Parameters.WaitForMessageTimeout);
+ }
+
+ if (this.Parameters.WaitForMessage)
+ {
+ lock (this.Results)
+ {
+ this.Results.Add(String.Format("WaitForMessage returned {0}", messageDetected));
+ }
+ }
+
+ if (messageDetected)
+ {
+ if (this.Parameters.AsyncReceive)
+ {
+ if (this.Parameters.TryReceive)
+ {
+ IAsyncResult result = channel.BeginTryReceive(this.Parameters.ReceiveTimeout, null, null);
+ bool ret = channel.EndTryReceive(result, out message);
+
+ lock (this.Results)
+ {
+ this.Results.Add(String.Format("TryReceive returned {0}", ret));
+ }
+ }
+ else
+ {
+ try
+ {
+ IAsyncResult result = channel.BeginReceive(this.Parameters.ReceiveTimeout, null, null);
+ message = channel.EndReceive(result);
+ }
+ catch (TimeoutException)
+ {
+ message = null;
+ }
+ }
+ }
+ else
+ {
+ if (this.Parameters.TryReceive)
+ {
+ bool ret = channel.TryReceive(this.Parameters.ReceiveTimeout, out message);
+
+ lock (this.Results)
+ {
+ this.Results.Add(String.Format("TryReceive returned {0}", ret));
+ }
+ }
+ else
+ {
+ try
+ {
+ message = channel.Receive(this.Parameters.ReceiveTimeout);
+ }
+ catch (TimeoutException)
+ {
+ message = null;
+ }
+ }
+ }
+ }
+ else
+ {
+ if (this.Parameters.TryReceive)
+ {
+ bool ret = false;
+ if (this.Parameters.AsyncReceive)
+ {
+ IAsyncResult result = channel.BeginTryReceive(this.Parameters.ReceiveTimeout, null, null);
+ if (this.Parameters.TryReceiveNullIAsyncResult)
+ {
+ try
+ {
+ channel.EndTryReceive(null, out message);
+ }
+ catch (Exception e)
+ {
+ lock (this.Results)
+ {
+ this.Results.Add(String.Format("TryReceive threw {0}", e.GetType().Name));
+ }
+ }
+ }
+
+ ret = channel.EndTryReceive(result, out message);
+ }
+ else
+ {
+ ret = channel.TryReceive(this.Parameters.ReceiveTimeout, out message);
+ }
+
+ lock (this.Results)
+ {
+ this.Results.Add(String.Format("TryReceive returned {0}", ret));
+ this.Results.Add(String.Format("Message was {0}", (message == null ? "null" : "not null")));
+ }
+ }
+
+ message = null;
+ }
+
+ if (commit && message != null)
+ {
+ lock (this.Results)
+ {
+ this.Results.Add(String.Format("Received message with Action '{0}'", message.Headers.Action));
+ }
+
+ ts.Complete();
+ }
+ else
+ {
+ Transaction.Current.Rollback();
+ }
+ }
+
+ return message;
+ }
+ }
+}