summaryrefslogtreecommitdiff
path: root/qpid/dotnet/client-010/perftest/PerfTest.cs
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/dotnet/client-010/perftest/PerfTest.cs')
-rw-r--r--qpid/dotnet/client-010/perftest/PerfTest.cs715
1 files changed, 715 insertions, 0 deletions
diff --git a/qpid/dotnet/client-010/perftest/PerfTest.cs b/qpid/dotnet/client-010/perftest/PerfTest.cs
new file mode 100644
index 0000000000..c94dd865d5
--- /dev/null
+++ b/qpid/dotnet/client-010/perftest/PerfTest.cs
@@ -0,0 +1,715 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Text;
+using System.Threading;
+using common.org.apache.qpid.transport.util;
+using org.apache.qpid.client;
+using org.apache.qpid.transport;
+using org.apache.qpid.transport.util;
+using Plossum.CommandLine;
+
+namespace PerfTest
+{
+ [CommandLineManager(ApplicationName = "Qpid Perf Tests", Copyright = "Apache Software Foundation")]
+ public class Options
+ {
+ [CommandLineOption(Description = "Displays this help text")]
+ public bool Help;
+
+ [CommandLineOption(Description = "Create shared queues.", MinOccurs = 0)]
+ public Boolean Setup;
+
+ [CommandLineOption(Description = "Run test, print report.", MinOccurs = 0)]
+ public Boolean Control;
+
+ [CommandLineOption(Description = "Publish messages.", MinOccurs = 0)]
+ public Boolean Publish;
+
+ [CommandLineOption(Description = "Subscribe for messages.", MinOccurs = 0)]
+ public Boolean Subscribe;
+
+ [CommandLineOption(Description = "Test mode: [shared|fanout|topic]", MinOccurs = 0)]
+ public string Mode
+ {
+ get { return _mMode; }
+ set
+ {
+ if (! value.Equals("shared") && ! value.Equals("fanout") && ! value.Equals("topic"))
+ throw new InvalidOptionValueException(
+ "The mode must not be shared|fanout|topic", false);
+ _mMode = value;
+ }
+ }
+
+ private string _mMode = "shared";
+
+ [CommandLineOption(Description = "Specifies the broler name", MinOccurs = 0)]
+ public string Broker
+ {
+ get { return _broker; }
+ set
+ {
+ if (String.IsNullOrEmpty(value))
+ throw new InvalidOptionValueException(
+ "The broker name must not be empty", false);
+ _broker = value;
+ }
+ }
+
+ private string _broker = "localhost";
+
+ [CommandLineOption(Description = "Specifies the port name", MinOccurs = 0)]
+ public int Port
+ {
+ get { return _port; }
+ set { _port = value; }
+ }
+
+ private int _port = 5672;
+
+ #region Publisher
+
+ [CommandLineOption(Description = "Create N publishers.", MinOccurs = 0)]
+ public int Pubs
+ {
+ get { return _pubs; }
+ set { _pubs = value; }
+ }
+
+ private int _pubs = 1;
+
+ [CommandLineOption(Description = "Each publisher sends N messages.", MinOccurs = 0)]
+ public double Count
+ {
+ get { return _count; }
+ set { _count = value; }
+ }
+
+ private double _count = 5000;
+
+ [CommandLineOption(Description = "Size of messages in bytes.", MinOccurs = 0)]
+ public long Size
+ {
+ get { return _size; }
+ set { _size = value; }
+ }
+
+ private long _size = 1024;
+
+ [CommandLineOption(Description = "Publisher use confirm-mode.", MinOccurs = 0)]
+ public Boolean Confirm = true;
+
+ [CommandLineOption(Description = "Publish messages as durable.", MinOccurs = 0)]
+ public Boolean Durable;
+
+ [CommandLineOption(Description = "Make data for each message unique.", MinOccurs = 0)]
+ public Boolean UniqueData;
+
+ [CommandLineOption(Description = "Wait for confirmation of each message before sending the next one.",
+ MinOccurs = 0)]
+ public Boolean SyncPub;
+
+ [CommandLineOption(Description = ">=0 delay between msg publish.", MinOccurs = 0)]
+ public double IntervalPub
+ {
+ get { return _interval_pub; }
+ set { _interval_pub = value; }
+ }
+
+ private double _interval_pub;
+
+ #endregion
+
+ #region Subscriber
+
+ [CommandLineOption(Description = "Create N subscribers.", MinOccurs = 0)]
+ public int Subs
+ {
+ get { return _subs; }
+ set { _subs = value; }
+ }
+
+ private int _subs = 1;
+
+ [CommandLineOption(Description = "N>0: Subscriber acks batches of N.\n N==0: Subscriber uses unconfirmed mode",
+ MinOccurs = 0)]
+ public int SubAck
+ {
+ get { return _suback; }
+ set { _suback = value; }
+ }
+
+ private int _suback;
+
+ [CommandLineOption(Description = ">=0 delay between msg consume", MinOccurs = 0)]
+ public double IntervalSub
+ {
+ get { return _interval_sub; }
+ set { _interval_sub = value; }
+ }
+
+ private double _interval_sub;
+
+ #endregion
+
+ [CommandLineOption(Description = "Create N queues.", MinOccurs = 0)]
+ public int Queues
+ {
+ get { return _qt; }
+ set { _qt = value; }
+ }
+
+ private int _qt = 1;
+
+ [CommandLineOption(Description = "Desired number of iterations of the test.", MinOccurs = 0)]
+ public int Iterations
+ {
+ get { return _iterations; }
+ set { _iterations = value; }
+ }
+
+ private int _iterations = 1;
+
+ [CommandLineOption(Description = "If non-zero, the transaction batch size.", MinOccurs = 0)]
+ public int Tx
+ {
+ get { return _tx; }
+ set { _tx = value; }
+ }
+
+ private int _tx;
+
+ [CommandLineOption(Description = "Make queue durable (implied if durable set.", MinOccurs = 0)]
+ public Boolean QueueDurable;
+
+ [CommandLineOption(Description = "Queue policy: count to trigger 'flow to disk'", MinOccurs = 0)]
+ public double QueueMaxCount
+ {
+ get { return _queueMaxCount; }
+ set { _queueMaxCount = value; }
+ }
+
+ private double _queueMaxCount;
+
+ [CommandLineOption(Description = "Queue policy: accumulated size to trigger 'flow to disk'", MinOccurs = 0)]
+ public double QueueMaxSize
+ {
+ get { return _queueMaxSize; }
+ set { _queueMaxSize = value; }
+ }
+
+ private double _queueMaxSize;
+
+ public double SubQuota
+ {
+ get { return _subQuota; }
+ set { _subQuota = value; }
+ }
+
+ private double _subQuota;
+ }
+
+ internal interface Startable
+ {
+ void Start();
+ }
+
+ public abstract class PerfTestClient : Startable
+ {
+ private readonly IClient _connection;
+ private readonly IClientSession _session;
+ private readonly Options _options;
+
+ public IClientSession Session
+ {
+ get { return _session; }
+ }
+
+ public Options Options
+ {
+ get { return _options; }
+ }
+
+ protected PerfTestClient(Options options)
+ {
+ _options = options;
+ _connection = new Client();
+ _connection.Connect(options.Broker, options.Port, "test", "guest", "guest");
+ _session = _connection.CreateSession(50000);
+ }
+
+ public abstract void Start();
+ }
+
+ public class SetupTest : PerfTestClient
+ {
+ public SetupTest(Options options)
+ : base(options)
+ {
+ }
+
+ private void queueInit(String name, Boolean durable, Dictionary<String, Object> arguments)
+ {
+ Session.QueueDeclare(name, null, arguments, durable ? Option.DURABLE : Option.NONE);
+ Session.QueuePurge(name);
+ Session.ExchangeBind(name, "amq.direct", name);
+ Session.Sync();
+ }
+
+ public override void Start()
+ {
+ queueInit("pub_start", false, null);
+ queueInit("pub_done", false, null);
+ queueInit("sub_ready", false, null);
+ queueInit("sub_done", false, null);
+ if (Options.Mode.Equals("shared"))
+ {
+ Dictionary<String, Object> settings = new Dictionary<string, object>();
+ if (Options.QueueMaxCount > 0)
+ settings.Add("qpid.max_count", Options.QueueMaxCount);
+ if (Options.QueueMaxSize > 0)
+ settings.Add("qpid.max_size", Options.QueueMaxSize);
+ for (int i = 0; i < Options.Queues; ++i)
+ {
+ string qname = "perftest" + i;
+ queueInit(qname, Options.Durable || Options.QueueDurable, settings);
+ }
+ }
+ }
+ }
+
+ public class SubscribeThread : PerfTestClient
+ {
+ private readonly string _queue;
+
+ public SubscribeThread(Options options, string key, string exchange)
+ : base(options)
+ {
+ _queue = "perftest" + (new UUID(10, 10));
+ Session.QueueDeclare(_queue, null, null, Option.EXCLUSIVE, Option.AUTO_DELETE,
+ Options.Durable ? Option.DURABLE : Option.NONE);
+ Session.ExchangeBind(_queue, exchange, key);
+ }
+
+ public SubscribeThread(Options options, string key)
+ : base(options)
+ {
+ _queue = key;
+ }
+
+ public override void Start()
+ {
+ if (Options.Tx > 0)
+ {
+ Session.TxSelect();
+ Session.Sync();
+ }
+ CircularBuffer<IMessage> buffer = new CircularBuffer<IMessage>(100);
+ // Create a listener and subscribe it to the queue named "message_queue"
+ IMessageListener listener = new SyncListener(buffer);
+
+ string dest = "dest" + UUID.RandomUuid();
+ Session.AttachMessageListener(listener, dest);
+ Session.MessageSubscribe(_queue, dest,
+ Options.Tx > 0 || Options.SubAck > 0
+ ? MessageAcceptMode.EXPLICIT
+ : MessageAcceptMode.NONE,
+ MessageAcquireMode.PRE_ACQUIRED, null, 0, null);
+ // issue credits
+ Session.MessageSetFlowMode(dest, MessageFlowMode.WINDOW);
+ Session.MessageFlow(dest, MessageCreditUnit.BYTE, ClientSession.MESSAGE_FLOW_MAX_BYTES);
+
+ // Notify controller we are ready.
+ IMessage message = new Message();
+ message.DeliveryProperties.SetRoutingKey("sub_ready");
+
+ message.AppendData(Encoding.UTF8.GetBytes("ready"));
+ Session.MessageTransfer("amq.direct", message);
+
+ if (Options.Tx > 0)
+ {
+ Session.TxCommit();
+ Session.Sync();
+ }
+
+
+ for (int j = 0; j < Options.Iterations; ++j)
+ {
+
+ //need to allocate some more credit
+ Session.MessageFlow(dest, MessageCreditUnit.MESSAGE, (long)Options.SubQuota);
+
+ RangeSet range = new RangeSet();
+ IMessage msg;
+ DateTime start = DateTime.Now;
+ for (long i = 0; i < Options.SubQuota; ++i)
+ {
+ msg = buffer.Dequeue();
+ if (Options.Tx > 0 && ((i + 1)%Options.Tx == 0))
+ {
+ Session.TxCommit();
+ Session.Sync();
+ }
+ if (Options.IntervalSub > 0)
+ {
+ Thread.Sleep((int) Options.IntervalSub*1000);
+ }
+ range.Add(msg.Id);
+ }
+ if (Options.Tx > 0 || Options.SubAck > 0)
+ Session.MessageAccept(range);
+ range.Clear();
+ if (Options.Tx > 0)
+ {
+ Session.TxSelect();
+ Session.Sync();
+ }
+ DateTime end = DateTime.Now;
+
+ // Report to publisher.
+ message.DeliveryProperties.SetRoutingKey("sub_done");
+ message.ClearData();
+ message.AppendData(BitConverter.GetBytes(Options.SubQuota / end.Subtract(start).TotalMilliseconds ));
+ Session.MessageTransfer("amq.direct", message);
+ if (Options.Tx > 0)
+ {
+ Session.TxSelect();
+ Session.Sync();
+ }
+ }
+ Session.Close();
+ }
+ }
+
+ public class SyncListener : IMessageListener
+ {
+ private readonly CircularBuffer<IMessage> _buffer;
+
+ public SyncListener(CircularBuffer<IMessage> buffer)
+ {
+ _buffer = buffer;
+ }
+
+ public void MessageTransfer(IMessage m)
+ {
+ _buffer.Enqueue(m);
+ }
+ }
+
+
+ public class PublishThread : PerfTestClient
+ {
+ private readonly string _exchange;
+ private readonly string _key;
+
+ public PublishThread(Options options, string key, string exchange)
+ : base(options)
+ {
+ _key = key;
+ _exchange = exchange;
+ }
+
+
+ public override void Start()
+ {
+ byte[] data = new byte[Options.Size];
+ // randomly populate data
+ Random r = new Random(34);
+ r.NextBytes(data);
+ IMessage message = new Message();
+ message.AppendData(data);
+
+ message.DeliveryProperties.SetRoutingKey(_key);
+
+ if (Options.Durable)
+ message.DeliveryProperties.SetDeliveryMode(MessageDeliveryMode.PERSISTENT);
+
+ if (Options.Tx > 0)
+ {
+ Session.TxSelect();
+ Session.Sync();
+ }
+
+ CircularBuffer<IMessage> buffer = new CircularBuffer<IMessage>(100);
+ // Create a listener and subscribe it to the queue named "pub_start"
+ IMessageListener listener = new SyncListener(buffer);
+ string localQueue = "localQueue-" + UUID.RandomUuid().ToString();
+ Session.QueueDeclare(localQueue, null, null, Option.AUTO_DELETE);
+ Session.ExchangeBind(localQueue, "amq.direct", "pub_start");
+ Session.AttachMessageListener(listener, localQueue);
+ Session.MessageSubscribe(localQueue);
+ if (Options.Tx > 0)
+ {
+ Session.TxCommit();
+ Session.Sync();
+ }
+ buffer.Dequeue();
+
+ for (int j = 0; j < Options.Iterations; ++j)
+ {
+ DateTime start = DateTime.Now;
+ for (long i = 0; i < Options.Count; ++i)
+ {
+ Session.MessageTransfer(_exchange, message);
+
+ if (Options.SyncPub)
+ {
+ Session.Sync();
+ }
+ if (Options.Tx > 0 && (i + 1)%Options.Tx == 0)
+ {
+ Session.TxSelect();
+ Session.Sync();
+ }
+ if (Options.IntervalPub > 0)
+ {
+ Thread.Sleep((int) Options.IntervalSub*1000);
+ }
+ }
+ Session.Sync();
+ DateTime end = DateTime.Now;
+
+ // Report to publisher.
+ message.DeliveryProperties.SetRoutingKey("pub_done");
+ message.ClearData();
+ double time = end.Subtract(start).TotalMilliseconds;
+ byte[] rate = BitConverter.GetBytes( Options.Count / time );
+ message.AppendData(rate);
+ Session.MessageTransfer("amq.direct", message);
+ if (Options.Tx > 0)
+ {
+ Session.TxSelect();
+ Session.Sync();
+ }
+ }
+ Session.Close();
+ }
+ }
+
+ public class Controller : PerfTestClient
+ {
+ public Controller(Options options)
+ : base(options)
+ {
+ }
+
+ private void process(int size, string queue)
+ {
+ CircularBuffer<IMessage> buffer = new CircularBuffer<IMessage>(100);
+ IMessageListener listener = new SyncListener(buffer);
+ string localQueue = "queue-" + UUID.RandomUuid();
+ Session.QueueDeclare(localQueue, null, null, Option.AUTO_DELETE);
+ Session.ExchangeBind(localQueue, "amq.direct", queue);
+ Session.AttachMessageListener(listener, localQueue);
+ Session.MessageSubscribe(localQueue);
+ for (int i = 0; i < size; ++i)
+ {
+ buffer.Dequeue();
+ }
+ }
+
+ private double processRate(int size, string queue)
+ {
+ CircularBuffer<IMessage> buffer = new CircularBuffer<IMessage>(100);
+ IMessageListener listener = new SyncListener(buffer);
+ string localQueue = "queue-" + UUID.RandomUuid();
+ Session.QueueDeclare(localQueue, null, null, Option.AUTO_DELETE);
+ Session.ExchangeBind(localQueue, "amq.direct", queue);
+ Session.AttachMessageListener(listener, localQueue);
+ Session.MessageSubscribe(localQueue);
+ double rate = 0;
+ RangeSet range = new RangeSet();
+ for (int i = 0; i < size; ++i)
+ {
+ IMessage m = buffer.Dequeue();
+ range.Add(m.Id);
+ BinaryReader reader = new BinaryReader(m.Body, Encoding.UTF8);
+ byte[] body = new byte[m.Body.Length - m.Body.Position];
+ reader.Read(body, 0, body.Length);
+ rate += BitConverter.ToDouble(body,0);
+ }
+ Session.MessageAccept(range);
+ return rate;
+ }
+
+ private void send(int size, string queue, string data)
+ {
+ IMessage message = new Message();
+ message.DeliveryProperties.SetRoutingKey(queue);
+ message.AppendData(Encoding.UTF8.GetBytes(data));
+ for (int i = 0; i < size; ++i)
+ {
+ Session.MessageTransfer("amq.direct", message);
+ }
+ }
+
+ public override void Start()
+ {
+ process(Options.Subs, "sub_ready");
+ for (int j = 0; j < Options.Iterations; ++j)
+ {
+ DateTime start = DateTime.Now;
+ send(Options.Pubs, "pub_start", "start"); // Start publishers
+ double pubrate = processRate(Options.Pubs, "pub_done");
+ double subrate = processRate(Options.Subs, "sub_done");
+ DateTime end = DateTime.Now;
+
+ double transfers = (Options.Pubs*Options.Count) + (Options.Subs*Options.SubQuota);
+ double time = end.Subtract(start).TotalSeconds;
+ double txrate = transfers/time;
+ double mbytes = (txrate*Options.Size) / (1024 * 1024) ;
+
+ Console.WriteLine("Total: " + transfers + " transfers of " + Options.Size + " bytes in "
+ + time + " seconds.\n" +
+ "Publish transfers/sec: " + pubrate * 1000 + "\n" +
+ "Subscribe transfers/sec: " + subrate * 1000 + "\n" +
+ "Total transfers/sec: " + txrate + "\n" +
+ "Total Mbytes/sec: " + mbytes);
+ Console.WriteLine("done");
+ }
+
+ }
+ }
+
+
+ public class PerfTest
+ {
+ private static int Main(string[] args)
+ {
+ Options options = new Options();
+ CommandLineParser parser = new CommandLineParser(options);
+ parser.Parse();
+ if (parser.HasErrors)
+ {
+ Console.WriteLine(parser.UsageInfo.GetErrorsAsString(78));
+ return -1;
+ }
+ if (options.Help)
+ {
+ Console.WriteLine(parser.UsageInfo.GetOptionsAsString(78));
+ return 0;
+ }
+ bool singleProcess =
+ (!options.Setup && !options.Control && !options.Publish && !options.Subscribe);
+ if (singleProcess)
+ {
+ options.Setup = options.Control = options.Publish = true;
+ options.Subscribe = true;
+ }
+
+
+ string exchange = "amq.direct";
+ switch (options.Mode)
+ {
+ case "shared":
+ options.SubQuota = (options.Pubs*options.Count)/options.Subs;
+ break;
+ case "fanout":
+ options.SubQuota = (options.Pubs*options.Count);
+ exchange = "amq.fanout";
+ break;
+ case "topic":
+ options.SubQuota = (options.Pubs*options.Count);
+ exchange = "amq.topic";
+ break;
+ }
+
+ if (options.Setup)
+ {
+ SetupTest setup = new SetupTest(options);
+ setup.Start(); // Set up queues
+ }
+
+ Thread contT = null;
+ if ( options.Control)
+ {
+ Controller c = new Controller(options);
+ contT = new Thread(c.Start);
+ contT.Start();
+ }
+
+ Thread[] publishers = null;
+ Thread[] subscribers = null;
+
+ // Start pubs/subs for each queue/topic.
+ for (int i = 0; i < options.Queues; ++i)
+ {
+ string key = "perftest" + i; // Queue or topic name.
+ if (options.Publish)
+ {
+ int n = singleProcess ? options.Pubs : 1;
+ publishers = new Thread[n];
+ for (int j = 0; j < n; ++j)
+ {
+ PublishThread pt = new PublishThread(options, key, exchange);
+ publishers[i] = new Thread(pt.Start);
+ publishers[i].Start();
+ }
+ }
+ if ( options.Subscribe)
+ {
+ int n = singleProcess ? options.Subs : 1;
+ subscribers = new Thread[n];
+ for (int j = 0; j < n; ++j)
+ {
+ SubscribeThread st;
+ if (options.Mode.Equals("shared"))
+ st = new SubscribeThread(options, key);
+ else
+ st = new SubscribeThread(options, key, exchange);
+ subscribers[i] = new Thread(st.Start);
+ subscribers[i].Start();
+ }
+ }
+ }
+
+ if (options.Control)
+ {
+ contT.Join();
+ }
+
+
+ // Wait for started threads.
+ if (options.Publish)
+ {
+ foreach (Thread t in publishers)
+ {
+ t.Join();
+ }
+ }
+
+ if (options.Subscribe)
+ {
+ foreach (Thread t in subscribers)
+ {
+ t.Join();
+ }
+ }
+
+
+ return 0;
+ }
+ }
+}