summaryrefslogtreecommitdiff
path: root/trunk/qpid/dotnet/client-010/perftest/PerfTest.cs
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/qpid/dotnet/client-010/perftest/PerfTest.cs')
-rw-r--r--trunk/qpid/dotnet/client-010/perftest/PerfTest.cs715
1 files changed, 0 insertions, 715 deletions
diff --git a/trunk/qpid/dotnet/client-010/perftest/PerfTest.cs b/trunk/qpid/dotnet/client-010/perftest/PerfTest.cs
deleted file mode 100644
index c94dd865d5..0000000000
--- a/trunk/qpid/dotnet/client-010/perftest/PerfTest.cs
+++ /dev/null
@@ -1,715 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-using System;
-using System.Collections.Generic;
-using System.IO;
-using System.Text;
-using System.Threading;
-using common.org.apache.qpid.transport.util;
-using org.apache.qpid.client;
-using org.apache.qpid.transport;
-using org.apache.qpid.transport.util;
-using Plossum.CommandLine;
-
-namespace PerfTest
-{
- [CommandLineManager(ApplicationName = "Qpid Perf Tests", Copyright = "Apache Software Foundation")]
- public class Options
- {
- [CommandLineOption(Description = "Displays this help text")]
- public bool Help;
-
- [CommandLineOption(Description = "Create shared queues.", MinOccurs = 0)]
- public Boolean Setup;
-
- [CommandLineOption(Description = "Run test, print report.", MinOccurs = 0)]
- public Boolean Control;
-
- [CommandLineOption(Description = "Publish messages.", MinOccurs = 0)]
- public Boolean Publish;
-
- [CommandLineOption(Description = "Subscribe for messages.", MinOccurs = 0)]
- public Boolean Subscribe;
-
- [CommandLineOption(Description = "Test mode: [shared|fanout|topic]", MinOccurs = 0)]
- public string Mode
- {
- get { return _mMode; }
- set
- {
- if (! value.Equals("shared") && ! value.Equals("fanout") && ! value.Equals("topic"))
- throw new InvalidOptionValueException(
- "The mode must not be shared|fanout|topic", false);
- _mMode = value;
- }
- }
-
- private string _mMode = "shared";
-
- [CommandLineOption(Description = "Specifies the broler name", MinOccurs = 0)]
- public string Broker
- {
- get { return _broker; }
- set
- {
- if (String.IsNullOrEmpty(value))
- throw new InvalidOptionValueException(
- "The broker name must not be empty", false);
- _broker = value;
- }
- }
-
- private string _broker = "localhost";
-
- [CommandLineOption(Description = "Specifies the port name", MinOccurs = 0)]
- public int Port
- {
- get { return _port; }
- set { _port = value; }
- }
-
- private int _port = 5672;
-
- #region Publisher
-
- [CommandLineOption(Description = "Create N publishers.", MinOccurs = 0)]
- public int Pubs
- {
- get { return _pubs; }
- set { _pubs = value; }
- }
-
- private int _pubs = 1;
-
- [CommandLineOption(Description = "Each publisher sends N messages.", MinOccurs = 0)]
- public double Count
- {
- get { return _count; }
- set { _count = value; }
- }
-
- private double _count = 5000;
-
- [CommandLineOption(Description = "Size of messages in bytes.", MinOccurs = 0)]
- public long Size
- {
- get { return _size; }
- set { _size = value; }
- }
-
- private long _size = 1024;
-
- [CommandLineOption(Description = "Publisher use confirm-mode.", MinOccurs = 0)]
- public Boolean Confirm = true;
-
- [CommandLineOption(Description = "Publish messages as durable.", MinOccurs = 0)]
- public Boolean Durable;
-
- [CommandLineOption(Description = "Make data for each message unique.", MinOccurs = 0)]
- public Boolean UniqueData;
-
- [CommandLineOption(Description = "Wait for confirmation of each message before sending the next one.",
- MinOccurs = 0)]
- public Boolean SyncPub;
-
- [CommandLineOption(Description = ">=0 delay between msg publish.", MinOccurs = 0)]
- public double IntervalPub
- {
- get { return _interval_pub; }
- set { _interval_pub = value; }
- }
-
- private double _interval_pub;
-
- #endregion
-
- #region Subscriber
-
- [CommandLineOption(Description = "Create N subscribers.", MinOccurs = 0)]
- public int Subs
- {
- get { return _subs; }
- set { _subs = value; }
- }
-
- private int _subs = 1;
-
- [CommandLineOption(Description = "N>0: Subscriber acks batches of N.\n N==0: Subscriber uses unconfirmed mode",
- MinOccurs = 0)]
- public int SubAck
- {
- get { return _suback; }
- set { _suback = value; }
- }
-
- private int _suback;
-
- [CommandLineOption(Description = ">=0 delay between msg consume", MinOccurs = 0)]
- public double IntervalSub
- {
- get { return _interval_sub; }
- set { _interval_sub = value; }
- }
-
- private double _interval_sub;
-
- #endregion
-
- [CommandLineOption(Description = "Create N queues.", MinOccurs = 0)]
- public int Queues
- {
- get { return _qt; }
- set { _qt = value; }
- }
-
- private int _qt = 1;
-
- [CommandLineOption(Description = "Desired number of iterations of the test.", MinOccurs = 0)]
- public int Iterations
- {
- get { return _iterations; }
- set { _iterations = value; }
- }
-
- private int _iterations = 1;
-
- [CommandLineOption(Description = "If non-zero, the transaction batch size.", MinOccurs = 0)]
- public int Tx
- {
- get { return _tx; }
- set { _tx = value; }
- }
-
- private int _tx;
-
- [CommandLineOption(Description = "Make queue durable (implied if durable set.", MinOccurs = 0)]
- public Boolean QueueDurable;
-
- [CommandLineOption(Description = "Queue policy: count to trigger 'flow to disk'", MinOccurs = 0)]
- public double QueueMaxCount
- {
- get { return _queueMaxCount; }
- set { _queueMaxCount = value; }
- }
-
- private double _queueMaxCount;
-
- [CommandLineOption(Description = "Queue policy: accumulated size to trigger 'flow to disk'", MinOccurs = 0)]
- public double QueueMaxSize
- {
- get { return _queueMaxSize; }
- set { _queueMaxSize = value; }
- }
-
- private double _queueMaxSize;
-
- public double SubQuota
- {
- get { return _subQuota; }
- set { _subQuota = value; }
- }
-
- private double _subQuota;
- }
-
- internal interface Startable
- {
- void Start();
- }
-
- public abstract class PerfTestClient : Startable
- {
- private readonly IClient _connection;
- private readonly IClientSession _session;
- private readonly Options _options;
-
- public IClientSession Session
- {
- get { return _session; }
- }
-
- public Options Options
- {
- get { return _options; }
- }
-
- protected PerfTestClient(Options options)
- {
- _options = options;
- _connection = new Client();
- _connection.Connect(options.Broker, options.Port, "test", "guest", "guest");
- _session = _connection.CreateSession(50000);
- }
-
- public abstract void Start();
- }
-
- public class SetupTest : PerfTestClient
- {
- public SetupTest(Options options)
- : base(options)
- {
- }
-
- private void queueInit(String name, Boolean durable, Dictionary<String, Object> arguments)
- {
- Session.QueueDeclare(name, null, arguments, durable ? Option.DURABLE : Option.NONE);
- Session.QueuePurge(name);
- Session.ExchangeBind(name, "amq.direct", name);
- Session.Sync();
- }
-
- public override void Start()
- {
- queueInit("pub_start", false, null);
- queueInit("pub_done", false, null);
- queueInit("sub_ready", false, null);
- queueInit("sub_done", false, null);
- if (Options.Mode.Equals("shared"))
- {
- Dictionary<String, Object> settings = new Dictionary<string, object>();
- if (Options.QueueMaxCount > 0)
- settings.Add("qpid.max_count", Options.QueueMaxCount);
- if (Options.QueueMaxSize > 0)
- settings.Add("qpid.max_size", Options.QueueMaxSize);
- for (int i = 0; i < Options.Queues; ++i)
- {
- string qname = "perftest" + i;
- queueInit(qname, Options.Durable || Options.QueueDurable, settings);
- }
- }
- }
- }
-
- public class SubscribeThread : PerfTestClient
- {
- private readonly string _queue;
-
- public SubscribeThread(Options options, string key, string exchange)
- : base(options)
- {
- _queue = "perftest" + (new UUID(10, 10));
- Session.QueueDeclare(_queue, null, null, Option.EXCLUSIVE, Option.AUTO_DELETE,
- Options.Durable ? Option.DURABLE : Option.NONE);
- Session.ExchangeBind(_queue, exchange, key);
- }
-
- public SubscribeThread(Options options, string key)
- : base(options)
- {
- _queue = key;
- }
-
- public override void Start()
- {
- if (Options.Tx > 0)
- {
- Session.TxSelect();
- Session.Sync();
- }
- CircularBuffer<IMessage> buffer = new CircularBuffer<IMessage>(100);
- // Create a listener and subscribe it to the queue named "message_queue"
- IMessageListener listener = new SyncListener(buffer);
-
- string dest = "dest" + UUID.RandomUuid();
- Session.AttachMessageListener(listener, dest);
- Session.MessageSubscribe(_queue, dest,
- Options.Tx > 0 || Options.SubAck > 0
- ? MessageAcceptMode.EXPLICIT
- : MessageAcceptMode.NONE,
- MessageAcquireMode.PRE_ACQUIRED, null, 0, null);
- // issue credits
- Session.MessageSetFlowMode(dest, MessageFlowMode.WINDOW);
- Session.MessageFlow(dest, MessageCreditUnit.BYTE, ClientSession.MESSAGE_FLOW_MAX_BYTES);
-
- // Notify controller we are ready.
- IMessage message = new Message();
- message.DeliveryProperties.SetRoutingKey("sub_ready");
-
- message.AppendData(Encoding.UTF8.GetBytes("ready"));
- Session.MessageTransfer("amq.direct", message);
-
- if (Options.Tx > 0)
- {
- Session.TxCommit();
- Session.Sync();
- }
-
-
- for (int j = 0; j < Options.Iterations; ++j)
- {
-
- //need to allocate some more credit
- Session.MessageFlow(dest, MessageCreditUnit.MESSAGE, (long)Options.SubQuota);
-
- RangeSet range = new RangeSet();
- IMessage msg;
- DateTime start = DateTime.Now;
- for (long i = 0; i < Options.SubQuota; ++i)
- {
- msg = buffer.Dequeue();
- if (Options.Tx > 0 && ((i + 1)%Options.Tx == 0))
- {
- Session.TxCommit();
- Session.Sync();
- }
- if (Options.IntervalSub > 0)
- {
- Thread.Sleep((int) Options.IntervalSub*1000);
- }
- range.Add(msg.Id);
- }
- if (Options.Tx > 0 || Options.SubAck > 0)
- Session.MessageAccept(range);
- range.Clear();
- if (Options.Tx > 0)
- {
- Session.TxSelect();
- Session.Sync();
- }
- DateTime end = DateTime.Now;
-
- // Report to publisher.
- message.DeliveryProperties.SetRoutingKey("sub_done");
- message.ClearData();
- message.AppendData(BitConverter.GetBytes(Options.SubQuota / end.Subtract(start).TotalMilliseconds ));
- Session.MessageTransfer("amq.direct", message);
- if (Options.Tx > 0)
- {
- Session.TxSelect();
- Session.Sync();
- }
- }
- Session.Close();
- }
- }
-
- public class SyncListener : IMessageListener
- {
- private readonly CircularBuffer<IMessage> _buffer;
-
- public SyncListener(CircularBuffer<IMessage> buffer)
- {
- _buffer = buffer;
- }
-
- public void MessageTransfer(IMessage m)
- {
- _buffer.Enqueue(m);
- }
- }
-
-
- public class PublishThread : PerfTestClient
- {
- private readonly string _exchange;
- private readonly string _key;
-
- public PublishThread(Options options, string key, string exchange)
- : base(options)
- {
- _key = key;
- _exchange = exchange;
- }
-
-
- public override void Start()
- {
- byte[] data = new byte[Options.Size];
- // randomly populate data
- Random r = new Random(34);
- r.NextBytes(data);
- IMessage message = new Message();
- message.AppendData(data);
-
- message.DeliveryProperties.SetRoutingKey(_key);
-
- if (Options.Durable)
- message.DeliveryProperties.SetDeliveryMode(MessageDeliveryMode.PERSISTENT);
-
- if (Options.Tx > 0)
- {
- Session.TxSelect();
- Session.Sync();
- }
-
- CircularBuffer<IMessage> buffer = new CircularBuffer<IMessage>(100);
- // Create a listener and subscribe it to the queue named "pub_start"
- IMessageListener listener = new SyncListener(buffer);
- string localQueue = "localQueue-" + UUID.RandomUuid().ToString();
- Session.QueueDeclare(localQueue, null, null, Option.AUTO_DELETE);
- Session.ExchangeBind(localQueue, "amq.direct", "pub_start");
- Session.AttachMessageListener(listener, localQueue);
- Session.MessageSubscribe(localQueue);
- if (Options.Tx > 0)
- {
- Session.TxCommit();
- Session.Sync();
- }
- buffer.Dequeue();
-
- for (int j = 0; j < Options.Iterations; ++j)
- {
- DateTime start = DateTime.Now;
- for (long i = 0; i < Options.Count; ++i)
- {
- Session.MessageTransfer(_exchange, message);
-
- if (Options.SyncPub)
- {
- Session.Sync();
- }
- if (Options.Tx > 0 && (i + 1)%Options.Tx == 0)
- {
- Session.TxSelect();
- Session.Sync();
- }
- if (Options.IntervalPub > 0)
- {
- Thread.Sleep((int) Options.IntervalSub*1000);
- }
- }
- Session.Sync();
- DateTime end = DateTime.Now;
-
- // Report to publisher.
- message.DeliveryProperties.SetRoutingKey("pub_done");
- message.ClearData();
- double time = end.Subtract(start).TotalMilliseconds;
- byte[] rate = BitConverter.GetBytes( Options.Count / time );
- message.AppendData(rate);
- Session.MessageTransfer("amq.direct", message);
- if (Options.Tx > 0)
- {
- Session.TxSelect();
- Session.Sync();
- }
- }
- Session.Close();
- }
- }
-
- public class Controller : PerfTestClient
- {
- public Controller(Options options)
- : base(options)
- {
- }
-
- private void process(int size, string queue)
- {
- CircularBuffer<IMessage> buffer = new CircularBuffer<IMessage>(100);
- IMessageListener listener = new SyncListener(buffer);
- string localQueue = "queue-" + UUID.RandomUuid();
- Session.QueueDeclare(localQueue, null, null, Option.AUTO_DELETE);
- Session.ExchangeBind(localQueue, "amq.direct", queue);
- Session.AttachMessageListener(listener, localQueue);
- Session.MessageSubscribe(localQueue);
- for (int i = 0; i < size; ++i)
- {
- buffer.Dequeue();
- }
- }
-
- private double processRate(int size, string queue)
- {
- CircularBuffer<IMessage> buffer = new CircularBuffer<IMessage>(100);
- IMessageListener listener = new SyncListener(buffer);
- string localQueue = "queue-" + UUID.RandomUuid();
- Session.QueueDeclare(localQueue, null, null, Option.AUTO_DELETE);
- Session.ExchangeBind(localQueue, "amq.direct", queue);
- Session.AttachMessageListener(listener, localQueue);
- Session.MessageSubscribe(localQueue);
- double rate = 0;
- RangeSet range = new RangeSet();
- for (int i = 0; i < size; ++i)
- {
- IMessage m = buffer.Dequeue();
- range.Add(m.Id);
- BinaryReader reader = new BinaryReader(m.Body, Encoding.UTF8);
- byte[] body = new byte[m.Body.Length - m.Body.Position];
- reader.Read(body, 0, body.Length);
- rate += BitConverter.ToDouble(body,0);
- }
- Session.MessageAccept(range);
- return rate;
- }
-
- private void send(int size, string queue, string data)
- {
- IMessage message = new Message();
- message.DeliveryProperties.SetRoutingKey(queue);
- message.AppendData(Encoding.UTF8.GetBytes(data));
- for (int i = 0; i < size; ++i)
- {
- Session.MessageTransfer("amq.direct", message);
- }
- }
-
- public override void Start()
- {
- process(Options.Subs, "sub_ready");
- for (int j = 0; j < Options.Iterations; ++j)
- {
- DateTime start = DateTime.Now;
- send(Options.Pubs, "pub_start", "start"); // Start publishers
- double pubrate = processRate(Options.Pubs, "pub_done");
- double subrate = processRate(Options.Subs, "sub_done");
- DateTime end = DateTime.Now;
-
- double transfers = (Options.Pubs*Options.Count) + (Options.Subs*Options.SubQuota);
- double time = end.Subtract(start).TotalSeconds;
- double txrate = transfers/time;
- double mbytes = (txrate*Options.Size) / (1024 * 1024) ;
-
- Console.WriteLine("Total: " + transfers + " transfers of " + Options.Size + " bytes in "
- + time + " seconds.\n" +
- "Publish transfers/sec: " + pubrate * 1000 + "\n" +
- "Subscribe transfers/sec: " + subrate * 1000 + "\n" +
- "Total transfers/sec: " + txrate + "\n" +
- "Total Mbytes/sec: " + mbytes);
- Console.WriteLine("done");
- }
-
- }
- }
-
-
- public class PerfTest
- {
- private static int Main(string[] args)
- {
- Options options = new Options();
- CommandLineParser parser = new CommandLineParser(options);
- parser.Parse();
- if (parser.HasErrors)
- {
- Console.WriteLine(parser.UsageInfo.GetErrorsAsString(78));
- return -1;
- }
- if (options.Help)
- {
- Console.WriteLine(parser.UsageInfo.GetOptionsAsString(78));
- return 0;
- }
- bool singleProcess =
- (!options.Setup && !options.Control && !options.Publish && !options.Subscribe);
- if (singleProcess)
- {
- options.Setup = options.Control = options.Publish = true;
- options.Subscribe = true;
- }
-
-
- string exchange = "amq.direct";
- switch (options.Mode)
- {
- case "shared":
- options.SubQuota = (options.Pubs*options.Count)/options.Subs;
- break;
- case "fanout":
- options.SubQuota = (options.Pubs*options.Count);
- exchange = "amq.fanout";
- break;
- case "topic":
- options.SubQuota = (options.Pubs*options.Count);
- exchange = "amq.topic";
- break;
- }
-
- if (options.Setup)
- {
- SetupTest setup = new SetupTest(options);
- setup.Start(); // Set up queues
- }
-
- Thread contT = null;
- if ( options.Control)
- {
- Controller c = new Controller(options);
- contT = new Thread(c.Start);
- contT.Start();
- }
-
- Thread[] publishers = null;
- Thread[] subscribers = null;
-
- // Start pubs/subs for each queue/topic.
- for (int i = 0; i < options.Queues; ++i)
- {
- string key = "perftest" + i; // Queue or topic name.
- if (options.Publish)
- {
- int n = singleProcess ? options.Pubs : 1;
- publishers = new Thread[n];
- for (int j = 0; j < n; ++j)
- {
- PublishThread pt = new PublishThread(options, key, exchange);
- publishers[i] = new Thread(pt.Start);
- publishers[i].Start();
- }
- }
- if ( options.Subscribe)
- {
- int n = singleProcess ? options.Subs : 1;
- subscribers = new Thread[n];
- for (int j = 0; j < n; ++j)
- {
- SubscribeThread st;
- if (options.Mode.Equals("shared"))
- st = new SubscribeThread(options, key);
- else
- st = new SubscribeThread(options, key, exchange);
- subscribers[i] = new Thread(st.Start);
- subscribers[i].Start();
- }
- }
- }
-
- if (options.Control)
- {
- contT.Join();
- }
-
-
- // Wait for started threads.
- if (options.Publish)
- {
- foreach (Thread t in publishers)
- {
- t.Join();
- }
- }
-
- if (options.Subscribe)
- {
- foreach (Thread t in subscribers)
- {
- t.Join();
- }
- }
-
-
- return 0;
- }
- }
-}