From e4a121846bdabda5b7d1edc2ebf33f453ee61ede Mon Sep 17 00:00:00 2001 From: "Stephen D. Huston" Date: Thu, 17 Dec 2009 16:43:44 +0000 Subject: Apply patch QPID-2230.patch; resolves QPID-2230. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@891795 13f79535-47bb-0310-9956-ffa450edef68 --- .../Test/Channel/WcfPerftest/RawBodyUtility.cs | 161 +++++ .../Qpid/Test/Channel/WcfPerftest/WcfPerftest.cs | 661 +++++++++++++++++++++ .../Test/Channel/WcfPerftest/WcfPerftest.csproj | 83 +++ 3 files changed, 905 insertions(+) create mode 100644 qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/RawBodyUtility.cs create mode 100644 qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/WcfPerftest.cs create mode 100644 qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/WcfPerftest.csproj (limited to 'qpid/wcf/test') diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/RawBodyUtility.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/RawBodyUtility.cs new file mode 100644 index 0000000000..55a01c790c --- /dev/null +++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/RawBodyUtility.cs @@ -0,0 +1,161 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Test.Channel.WcfPerftest +{ + using System; + using System.Collections; + using System.IO; + using System.ServiceModel; + using System.ServiceModel.Channels; + using System.ServiceModel.Description; + using System.Threading; + using System.Text; + using System.Xml; + using Apache.Qpid.Channel; + + + /// + /// A sample interface for populating and extracting message body content. + /// Just enough methods to handle basic Interop text and raw byte messages. + /// + + + public interface IRawBodyUtility + { + Message CreateMessage(byte[] body, int offset, int len); + Message CreateMessage(byte[] body); + byte[] GetBytes(Message m, byte[] recyclableBuffer); + + Message CreateMessage(string body); + string GetText(Message m); + } + + // an implementation of IRawBodyUtility that expects a RawMessageEncoder based channel + + public class RawEncoderUtility : IRawBodyUtility + { + public Message CreateMessage(byte[] body, int offset, int count) + { + return Message.CreateMessage(MessageVersion.None, "", new RawEncoderBodyWriter(body, offset, count)); + } + + public Message CreateMessage(byte[] body) + { + return CreateMessage(body, 0, body.Length); + } + + public byte[] GetBytes(Message message, byte[] recyclableBuffer) + { + XmlDictionaryReader reader = message.GetReaderAtBodyContents(); + int length; + + while (!reader.HasValue) + { + reader.Read(); + if (reader.EOF) + { + throw new InvalidDataException("empty XmlDictionaryReader"); + } + } + + if (reader.TryGetBase64ContentLength(out length)) + { + byte[] bytes = null; + if (recyclableBuffer != null) + { + if (recyclableBuffer.Length == length) + { + // reuse + bytes = recyclableBuffer; + } + } + + if (bytes == null) + { + bytes = new byte[length]; + } + + // this is the single copy mechanism from native to managed space with no intervening + // buffers. One could also write a method GetBytes(msg, myBuf, offset)... + reader.ReadContentAsBase64(bytes, 0, length); + reader.Close(); + return bytes; + } + else + { + // uses whatever default buffering mechanism is used by the base XmlDictionaryReader class + return reader.ReadContentAsBase64(); + } + } + + public Message CreateMessage(string body) + { + return Message.CreateMessage(MessageVersion.None, "", new RawEncoderBodyWriter(body)); + } + + public string GetText(Message message) + { + byte[] rawBuffer = GetBytes(message, null); + return Encoding.UTF8.GetString(rawBuffer, 0, rawBuffer.Length); + } + + internal class RawEncoderBodyWriter : BodyWriter + { + // works only with the Raw Encoder; the "body" is either a single string or byte[] segment + String bodyAsString; + byte[] bodyAsBytes; + int offset; + int count; + + public RawEncoderBodyWriter(string body) + : base(false) // isBuffered + { + this.bodyAsString = body; + } + + public RawEncoderBodyWriter(byte[] body, int offset, int count) + : base(false) // isBuffered + { + this.bodyAsBytes = body; + this.offset = offset; + this.count = count; + } + + protected override void OnWriteBodyContents(System.Xml.XmlDictionaryWriter writer) + { + // TODO: RawMessageEncoder.StreamElementName should be public. + writer.WriteStartElement("Binary"); // the expected Raw encoder "" virtual xml tag + + if (bodyAsString != null) + { + byte[] buf = Encoding.UTF8.GetBytes(bodyAsString); + writer.WriteBase64(buf, 0, buf.Length); + } + else + { + writer.WriteBase64(this.bodyAsBytes, this.offset, this.count); + } + + writer.WriteEndElement(); + } + } + } + +} diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/WcfPerftest.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/WcfPerftest.cs new file mode 100644 index 0000000000..992d6e9bd2 --- /dev/null +++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/WcfPerftest.cs @@ -0,0 +1,661 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Test.Channel.WcfPerftest +{ + using System; + using System.Collections; + using System.Collections.Generic; + using System.ComponentModel; + using System.Configuration; + using System.Diagnostics; + using System.IO; + using System.ServiceModel; + using System.ServiceModel.Channels; + using System.ServiceModel.Description; + using System.Threading; + using System.Transactions; + using System.Text; + using System.Xml; + using Apache.Qpid.AmqpTypes; + using Apache.Qpid.Channel; + + // this program implements a subset of the functionality in qpid\cpp\src\tests\perftest.cpp + + // for a given broker, create reader and writer channels to queues/exchanges + // lazilly creates binding and channel factories + + public class QueueChannelFactory + { + private static AmqpBinding brokerBinding; + private static IChannelFactory readerFactory; + private static IChannelFactory writerFactory; + private static string brokerAddr = "127.0.0.1"; + private static int brokerPort = 5672; + + public static void SetBroker(string addr, int port) + { + brokerAddr = addr; + brokerPort = port; + } + + private static void InitializeBinding() + { + AmqpBinaryBinding binding = new AmqpBinaryBinding(); + binding.BrokerHost = brokerAddr; + binding.BrokerPort = brokerPort; + binding.TransferMode = TransferMode.Streamed; + binding.PrefetchLimit = 5000; + binding.Shared = true; + brokerBinding = binding; + } + + public static IInputChannel CreateReaderChannel(string queueName) + { + lock (typeof(QueueChannelFactory)) + { + if (brokerBinding == null) + { + InitializeBinding(); + } + + if (readerFactory == null) + { + readerFactory = brokerBinding.BuildChannelFactory(); + readerFactory.Open(); + } + + IInputChannel channel = readerFactory.CreateChannel(new EndpointAddress( + new Uri("amqp:" + queueName))); + channel.Open(); + + return channel; + } + } + + public static IOutputChannel CreateWriterChannel(string exchangeName, string routingKey) + { + lock (typeof(QueueChannelFactory)) + { + if (brokerBinding == null) + { + InitializeBinding(); + } + + if (writerFactory == null) + { + writerFactory = brokerBinding.BuildChannelFactory(); + writerFactory.Open(); + } + + IOutputChannel channel = writerFactory.CreateChannel(new EndpointAddress( + "amqp:" + exchangeName + + "?routingkey=" + routingKey)); + channel.Open(); + + return channel; + } + } + } + + public enum ClientType + { + Publisher, + Subscriber, + InteropDemo + } + + public class Options + { + public string broker; + public int port; + public UInt64 messageCount; + public int messageSize; + public ClientType type; + public string baseName; + public int subTxSize; + public int pubTxSize; + public bool durable; + + public Options() + { + this.broker = "127.0.0.1"; + this.port = 5672; + this.messageCount = 500000; + this.messageSize = 1024; + this.type = ClientType.InteropDemo; // default: once as pub and once as sub + this.baseName = "perftest"; + this.pubTxSize = 0; + this.subTxSize = 0; + this.durable = false; + } + + public void Parse(string[] args) + { + int argCount = args.Length; + int current = 0; + bool typeSelected = false; + + while (current < argCount) + { + string arg = args[current]; + if (arg == "--publish") + { + if (typeSelected) + throw new ArgumentException("too many roles"); + + this.type = ClientType.Publisher; + typeSelected = true; + } + else if (arg == "--subscribe") + { + if (typeSelected) + throw new ArgumentException("too many roles"); + + this.type = ClientType.Subscriber; + typeSelected = true; + } + else if (arg == "--size") + { + arg = args[++current]; + int i = int.Parse(arg); + if (i > 0) + { + this.messageSize = i; + } + } + else if (arg == "--count") + { + arg = args[++current]; + UInt64 i = UInt64.Parse(arg); + if (i > 0) + { + this.messageCount = i; + } + } + else if (arg == "--broker") + { + this.broker = args[++current]; + } + else if (arg == "--port") + { + arg = args[++current]; + int i = int.Parse(arg); + if (i > 0) + { + this.port = i; + } + } + else if (arg == "--base-name") + { + this.baseName = args[++current]; + } + + else if (arg == "--tx") + { + arg = args[++current]; + int i = int.Parse(arg); + if (i > 0) + { + this.subTxSize = i; + this.pubTxSize = i; + } + } + + else if (arg == "--durable") + { + arg = args[++current]; + if (arg.Equals("yes")) + { + this.durable = true; + } + } + + current++; + } + } + } + + + public class Client + { + protected Options opts; + + public static void Expect(string actual, string expect) + { + if (expect != actual) + { + throw new Exception("Expecting " + expect + " but received " + actual); + } + } + + public static void Close(IChannel channel) + { + if (channel == null) + { + return; + } + + try + { + channel.Close(); + } + catch (Exception e) + { + Console.WriteLine("channel close exception {0}", e); + } + } + + public string Fqn(string name) + { + return opts.baseName + '_' + name; + } + } + + + public class WcfPerftest + { + static void WarmUpTransactionSubsystem(Options opts) + { + // see if any use of transactions is expected + if ((opts.type == ClientType.Publisher) && (opts.pubTxSize == 0)) + return; + + if ((opts.type == ClientType.Subscriber) && (opts.subTxSize == 0)) + return; + + if (opts.type == ClientType.InteropDemo) + { + if ((opts.subTxSize == 0) && (opts.pubTxSize == 0)) + return; + } + + Console.WriteLine("Initializing transactions"); + IRawBodyUtility bodyUtil = new RawEncoderUtility(); + + // send a transacted message to nowhere to force the initial registration with MSDTC + IOutputChannel channel = QueueChannelFactory.CreateWriterChannel("", Guid.NewGuid().ToString()); + Message msg = bodyUtil.CreateMessage("sacrificial transacted message from WcfPerftest"); + using (TransactionScope ts = new TransactionScope()) + { + channel.Send(msg); + // abort/rollback + ts.Dispose(); + } + channel.Close(); + Console.WriteLine("transaction resource manager ready"); + } + + static void InteropDemo(Options opts) + { + string perftest_cpp_exe = "perftest.exe"; + string commonArgs = String.Format(" --count {0} --size {1}", opts.messageCount, opts.messageSize); + + if (opts.durable) + { + commonArgs += " --durable yes"; + } + + Console.WriteLine("===== WCF Subscriber and C++ Publisher ====="); + + Process setup = new Process(); + setup.StartInfo.FileName = perftest_cpp_exe; + setup.StartInfo.UseShellExecute = false; + setup.StartInfo.Arguments = "--setup" + commonArgs; + try + { + setup.Start(); + } + catch (Win32Exception win32e) + { + Console.WriteLine("Cannot execute {0}: PATH not set?", perftest_cpp_exe); + Console.WriteLine(" Error: {0}", win32e.Message); + return; + } + setup.WaitForExit(); + + Process control = new Process(); + control.StartInfo.FileName = perftest_cpp_exe; + control.StartInfo.UseShellExecute = false; + control.StartInfo.Arguments = "--control" + commonArgs; + control.Start(); + + Process publish = new Process(); + publish.StartInfo.FileName = perftest_cpp_exe; + publish.StartInfo.UseShellExecute = false; + publish.StartInfo.Arguments = "--publish" + commonArgs; + publish.Start(); + + SubscribeThread subscribeWcf = new SubscribeThread(opts.baseName + "0", opts); + Thread subThread = new Thread(subscribeWcf.Run); + subThread.Start(); + + subThread.Join(); + publish.WaitForExit(); + control.WaitForExit(); + + Console.WriteLine(); + Console.WriteLine("===== WCF Publisher and C++ Subscriber ====="); + + setup = new Process(); + setup.StartInfo.FileName = perftest_cpp_exe; + setup.StartInfo.UseShellExecute = false; + setup.StartInfo.Arguments = "--setup" + commonArgs; + setup.Start(); + setup.WaitForExit(); + + control = new Process(); + control.StartInfo.FileName = perftest_cpp_exe; + control.StartInfo.UseShellExecute = false; + control.StartInfo.Arguments = "--control" + commonArgs; + control.Start(); + + PublishThread pub = new PublishThread(opts.baseName + "0", "", opts); + Thread pubThread = new Thread(pub.Run); + pubThread.Start(); + + Process subscribeCpp = new Process(); + subscribeCpp.StartInfo.FileName = perftest_cpp_exe; + subscribeCpp.StartInfo.UseShellExecute = false; + subscribeCpp.StartInfo.Arguments = "--subscribe" + commonArgs; + subscribeCpp.Start(); + + subscribeCpp.WaitForExit(); + pubThread.Join(); + control.WaitForExit(); + } + + static void Main(string[] mainArgs) + { + Options opts = new Options(); + opts.Parse(mainArgs); + QueueChannelFactory.SetBroker(opts.broker, opts.port); + + WarmUpTransactionSubsystem(opts); + + if (opts.type == ClientType.Publisher) + { + PublishThread pub = new PublishThread(opts.baseName + "0", "", opts); + Thread pubThread = new Thread(pub.Run); + pubThread.Start(); + pubThread.Join(); + } + else if (opts.type == ClientType.Subscriber) + { + SubscribeThread sub = new SubscribeThread(opts.baseName + "0", opts); + Thread subThread = new Thread(sub.Run); + subThread.Start(); + subThread.Join(); + } + else + { + InteropDemo(opts); + } + + if (System.Diagnostics.Debugger.IsAttached) + { + Console.WriteLine("Hit return to continue..."); + Console.ReadLine(); + } + } + } + + public class PublishThread : Client + { + string destination; // exchange/queue + string routingKey; + int msgSize; + UInt64 msgCount; + IOutputChannel publishQueue; + + public PublishThread(string key, string q, Options opts) + { + this.routingKey = key; + this.destination = q; + this.msgSize = opts.messageSize; + this.msgCount = opts.messageCount; + this.opts = opts; + } + + static void StampSequenceNo(byte[] data, UInt64 n) + { + int wordLen = IntPtr.Size; // mimic size_t in C++ + + if (data.Length < wordLen) + throw new ArgumentException("message size"); + for (int i = 0; i < wordLen; i++) + { + data[i] = (byte) (n & 0xff); + n >>= 8; + } + } + + public void Run() + { + IRawBodyUtility bodyUtil = new RawEncoderUtility(); + + IInputChannel startQueue = null; + IOutputChannel doneQueue = null; + UInt64 batchSize = (UInt64)opts.pubTxSize; + bool txPending = false; + AmqpProperties amqpProperties = null; + + if (opts.durable) + { + amqpProperties = new AmqpProperties(); + amqpProperties.Durable = true; + } + + try + { + publishQueue = QueueChannelFactory.CreateWriterChannel(this.destination, this.routingKey); + doneQueue = QueueChannelFactory.CreateWriterChannel("", this.Fqn("pub_done")); + startQueue = QueueChannelFactory.CreateReaderChannel(this.Fqn("pub_start")); + + // wait for our start signal + Message msg; + msg = startQueue.Receive(TimeSpan.MaxValue); + Expect(bodyUtil.GetText(msg), "start"); + msg.Close(); + + Stopwatch stopwatch = new Stopwatch(); + AsyncCallback sendCallback = new AsyncCallback(this.AsyncSendCB); + + byte[] data = new byte[this.msgSize]; + IAsyncResult sendResult = null; + + Console.WriteLine("sending {0}", this.msgCount); + stopwatch.Start(); + + if (batchSize > 0) + { + Transaction.Current = new CommittableTransaction(); + } + + for (UInt64 i = 0; i < this.msgCount; i++) + { + StampSequenceNo(data, i); + msg = bodyUtil.CreateMessage(data); + if (amqpProperties != null) + { + msg.Properties.Add("AmqpProperties", amqpProperties); + } + + sendResult = publishQueue.BeginSend(msg, TimeSpan.MaxValue, sendCallback, msg); + + if (batchSize > 0) + { + txPending = true; + if (((i + 1) % batchSize) == 0) + { + ((CommittableTransaction)Transaction.Current).Commit(); + txPending = false; + Transaction.Current = new CommittableTransaction(); + } + } + } + + if (txPending) + { + ((CommittableTransaction)Transaction.Current).Commit(); + } + + Transaction.Current = null; + + sendResult.AsyncWaitHandle.WaitOne(); + stopwatch.Stop(); + + double mps = (msgCount / stopwatch.Elapsed.TotalSeconds); + + msg = bodyUtil.CreateMessage(String.Format("{0:0.##}", mps)); + doneQueue.Send(msg, TimeSpan.MaxValue); + msg.Close(); + } + finally + { + Close((IChannel)doneQueue); + Close((IChannel)publishQueue); + Close(startQueue); + } + } + + void AsyncSendCB(IAsyncResult result) + { + publishQueue.EndSend(result); + ((Message)result.AsyncState).Close(); + } + } + + public class SubscribeThread : Client + { + string queue; + int msgSize; + UInt64 msgCount; + IInputChannel subscribeQueue; + + public SubscribeThread(string q, Options opts) + { + this.queue = q; + this.msgSize = opts.messageSize; + this.msgCount = opts.messageCount; + this.opts = opts; + } + + static UInt64 GetSequenceNumber(byte[] data) + { + int wordLen = IntPtr.Size; // mimic size_t in C++ + + if (data.Length < wordLen) + throw new ArgumentException("message size"); + UInt64 n = 0; + for (int i = (wordLen - 1); i >= 0; i--) + { + n = (256 * n) + data[i]; + } + return n; + } + + public void Run() + { + IRawBodyUtility bodyUtil = new RawEncoderUtility(); + + IOutputChannel readyQueue = null; + IOutputChannel doneQueue = null; + UInt64 batchSize = (UInt64)opts.subTxSize; + bool txPending = false; + byte[] data = null; + + try + { + this.subscribeQueue = QueueChannelFactory.CreateReaderChannel(this.queue); + readyQueue = QueueChannelFactory.CreateWriterChannel("", this.Fqn("sub_ready")); + doneQueue = QueueChannelFactory.CreateWriterChannel("", this.Fqn("sub_done")); + + Message msg = bodyUtil.CreateMessage("ready"); + readyQueue.Send(msg, TimeSpan.MaxValue); + msg.Close(); + + + Stopwatch stopwatch = new Stopwatch(); + stopwatch.Start(); + + Console.WriteLine("receiving {0}", this.msgCount); + UInt64 expect = 0; + + if (batchSize > 0) + { + Transaction.Current = new CommittableTransaction(); + } + + for (UInt64 i = 0; i < this.msgCount; i++) + { + msg = subscribeQueue.Receive(TimeSpan.MaxValue); + + data = bodyUtil.GetBytes(msg, data); + msg.Close(); + if (data.Length != this.msgSize) + { + throw new Exception("subscribe message size mismatch"); + } + + UInt64 n = GetSequenceNumber(data); + if (n != expect) + { + throw new Exception(String.Format("message sequence error. expected {0} got {1}", expect, n)); + } + expect = n + 1; + + if (batchSize > 0) + { + txPending = true; + if (((i + 1) % batchSize) == 0) + { + ((CommittableTransaction)Transaction.Current).Commit(); + txPending = false; + Transaction.Current = new CommittableTransaction(); + } + } + } + + if (txPending) + { + ((CommittableTransaction)Transaction.Current).Commit(); + } + + Transaction.Current = null; + + stopwatch.Stop(); + + double mps = (msgCount / stopwatch.Elapsed.TotalSeconds); + + msg = bodyUtil.CreateMessage(String.Format("{0:0.##}", mps)); + doneQueue.Send(msg, TimeSpan.MaxValue); + msg.Close(); + + subscribeQueue.Close(); + } + finally + { + Close((IChannel)doneQueue); + Close((IChannel)this.subscribeQueue); + Close(readyQueue); + } + } + } +} diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/WcfPerftest.csproj b/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/WcfPerftest.csproj new file mode 100644 index 0000000000..44ef998a24 --- /dev/null +++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/WcfPerftest.csproj @@ -0,0 +1,83 @@ + + + + + Debug + AnyCPU + 9.0.21022 + 2.0 + {D0F8FDE4-7AC6-4CFF-986A-50D06F7FD733} + Exe + Apache.Qpid.Test.Channel.WcfPerftest + WcfPerftest + v3.5 + 512 + + + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + + + + + 3.0 + + + 3.0 + + + + + + + + + + + {8AABAB30-7D1E-4539-B7D1-05450262BAD2} + Channel + + + {C9B6AC75-6332-47A4-B82B-0C20E0AF2D34} + Interop + + + + + -- cgit v1.2.1