summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-04-09 16:06:02 +0000
committerAlan Conway <aconway@apache.org>2010-04-09 16:06:02 +0000
commita3ad637f4960a7508683f35a8be3b256c941a92a (patch)
tree764a6f8f0441947ef7c54d62c8679d2609fe74d8
parent562660196b4d33ed52fe7788592dc757c527719e (diff)
downloadqpid-python-a3ad637f4960a7508683f35a8be3b256c941a92a.tar.gz
Script to run performance benchmarks using qpid_send and qpid_receive.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@932479 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/tests/Statistics.cpp39
-rwxr-xr-xqpid/cpp/src/tests/qpid_cpp_benchmark106
2 files changed, 129 insertions, 16 deletions
diff --git a/qpid/cpp/src/tests/Statistics.cpp b/qpid/cpp/src/tests/Statistics.cpp
index 3c5a3a6d4e..bcc646ce46 100644
--- a/qpid/cpp/src/tests/Statistics.cpp
+++ b/qpid/cpp/src/tests/Statistics.cpp
@@ -21,10 +21,14 @@
#include "Statistics.h"
#include <qpid/messaging/Message.h>
#include <ostream>
+#include <iomanip>
namespace qpid {
namespace tests {
+using namespace std;
+const int WIDTH=10;
+
Statistic::~Statistic() {}
Throughput::Throughput() : messages(0), started(false) {}
@@ -37,19 +41,19 @@ void Throughput::message(const messaging::Message&) {
}
}
-void Throughput::header(std::ostream& o) const {
- o << "msg/sec";
+void Throughput::header(ostream& o) const {
+ o << setw(WIDTH) << "msg/sec";
}
-void Throughput::report(std::ostream& o) const {
+void Throughput::report(ostream& o) const {
double elapsed(int64_t(sys::Duration(start, sys::now()))/double(sys::TIME_SEC));
- o << messages/elapsed;
+ o << setw(WIDTH) << messages/elapsed;
}
ThroughputAndLatency::ThroughputAndLatency() :
total(0),
- min(std::numeric_limits<double>::max()),
- max(std::numeric_limits<double>::min())
+ min(numeric_limits<double>::max()),
+ max(numeric_limits<double>::min())
{}
void ThroughputAndLatency::message(const messaging::Message& m) {
@@ -67,22 +71,25 @@ void ThroughputAndLatency::message(const messaging::Message& m) {
}
}
-void ThroughputAndLatency::header(std::ostream& o) const {
+void ThroughputAndLatency::header(ostream& o) const {
Throughput::header(o);
- o << " latency(ms)min max avg";
+ o << setw(3*WIDTH) << "latency(ms): min max avg";
}
-void ThroughputAndLatency::report(std::ostream& o) const {
+void ThroughputAndLatency::report(ostream& o) const {
Throughput::report(o);
- o << " ";
if (messages)
- o << min << " " << max << " " << total/messages;
+ o << setw(WIDTH) << min << setw(WIDTH) << max << setw(WIDTH) << total/messages;
else
o << "Can't compute latency for 0 messages.";
}
-ReporterBase::ReporterBase(std::ostream& o, int batch)
- : wantBatch(batch), batchCount(0), headerPrinted(false), out(o) {}
+ReporterBase::ReporterBase(ostream& o, int batch)
+ : wantBatch(batch), batchCount(0), headerPrinted(false), out(o)
+{
+ o.precision(2);
+ o << fixed;
+}
ReporterBase::~ReporterBase() {}
@@ -96,7 +103,7 @@ void ReporterBase::message(const messaging::Message& m) {
if (++batchCount == wantBatch) {
header();
batch->report(out);
- out << std::endl;
+ out << endl;
batch = create();
batchCount = 0;
}
@@ -107,14 +114,14 @@ void ReporterBase::message(const messaging::Message& m) {
void ReporterBase::report() {
header();
overall->report(out);
- out << std::endl;
+ out << endl;
}
void ReporterBase::header() {
if (!headerPrinted) {
if (!overall.get()) overall = create();
overall->header(out);
- out << std::endl;
+ out << endl;
headerPrinted = true;
}
}
diff --git a/qpid/cpp/src/tests/qpid_cpp_benchmark b/qpid/cpp/src/tests/qpid_cpp_benchmark
new file mode 100755
index 0000000000..0b940dc30a
--- /dev/null
+++ b/qpid/cpp/src/tests/qpid_cpp_benchmark
@@ -0,0 +1,106 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse, time, qpid.messaging
+from threading import Thread
+from subprocess import Popen, PIPE, STDOUT
+
+op = optparse.OptionParser(usage="usage: %prog [options]",
+ description="simple performance benchmarks")
+op.add_option("-b", "--broker", default="127.0.0.1",
+ help="url of broker to connect to")
+op.add_option("-q", "--queues", default=1, type="int", metavar="N",
+ help="create N queues (default %default)")
+op.add_option("-s", "--senders", default=1, type="int", metavar="N",
+ help="start N senders per queue (default %default)")
+op.add_option("-r", "--receivers", default=1, type="int", metavar="N",
+ help="start N receivers per queue (default %default)")
+op.add_option("-m", "--messages", default=100000, type="int", metavar="N",
+ help="send N messages per sender (default %default)")
+op.add_option("--queue-name", default="benchmark",
+ help="base name for queues (default %default)")
+op.add_option("--send-rate", default=0, metavar="R",
+ help="send rate limited to R messages/second, 0 means no limit")
+
+def start_receive(queue, opts):
+ return Popen(["qpid_receive",
+ "-b", opts.broker,
+ "-a", "%s;{create:always}"%(queue),
+ "--forever",
+ "--print-content=no",
+ "--report-total",
+ "--rate %s"%(opts.send_rate)],
+ stdout=PIPE, stderr=STDOUT)
+
+def start_send(queue, opts):
+ return Popen(["qpid_send",
+ "-b", opts.broker,
+ "-a", queue,
+ "--count", str(opts.messages),
+ "--send-eos", str(opts.receivers),
+ "--content", "benchmark",
+ "--report-total"],
+ stdout=PIPE, stderr=STDOUT)
+
+def wait_for_output(p):
+ out,err=p.communicate()
+ if p.returncode != 0: raise Exception("ERROR:\n%s"%(out))
+ return out
+
+def delete_queues(queues, broker):
+ c = qpid.messaging.Connection(broker)
+ c.open()
+ for q in queues:
+ try: s = c.session().sender("%s;{delete:always}"%(q))
+ except qpid.messaging.exceptions.SendError:pass # Ignore "no such queue"
+ c.close()
+
+def wait_for_queues(queues, broker):
+ c = qpid.messaging.Connection(broker)
+ c.open()
+ s = c.session()
+ while True:
+ try:
+ for q in queues: s.sender(q)
+ break
+ except: pass
+ c.close()
+
+def skip_first_line(text): return "\n".join(text.split("\n")[1:])
+
+def print_output(processes):
+ print wait_for_output(processes.pop(0)),
+ for p in processes: print skip_first_line(wait_for_output(p)),
+
+def main():
+ opts, args = op.parse_args()
+ queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)]
+ delete_queues(queues, opts.broker)
+ receivers = [start_receive(q, opts) for q in queues for i in xrange(opts.receivers)]
+ wait_for_queues(queues, opts.broker) # Wait for receivers to be ready
+ senders = [start_send(q, opts) for q in queues for i in xrange(opts.senders)]
+ print "Send"
+ print_output(senders)
+ print "\nReceive"
+ print_output(receivers)
+ delete_queues(queues, opts.broker)
+
+if __name__ == "__main__": main()
+