diff options
author | Alan Conway <aconway@apache.org> | 2010-04-09 16:06:02 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-04-09 16:06:02 +0000 |
commit | a3ad637f4960a7508683f35a8be3b256c941a92a (patch) | |
tree | 764a6f8f0441947ef7c54d62c8679d2609fe74d8 | |
parent | 562660196b4d33ed52fe7788592dc757c527719e (diff) | |
download | qpid-python-a3ad637f4960a7508683f35a8be3b256c941a92a.tar.gz |
Script to run performance benchmarks using qpid_send and qpid_receive.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@932479 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/tests/Statistics.cpp | 39 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/qpid_cpp_benchmark | 106 |
2 files changed, 129 insertions, 16 deletions
diff --git a/qpid/cpp/src/tests/Statistics.cpp b/qpid/cpp/src/tests/Statistics.cpp index 3c5a3a6d4e..bcc646ce46 100644 --- a/qpid/cpp/src/tests/Statistics.cpp +++ b/qpid/cpp/src/tests/Statistics.cpp @@ -21,10 +21,14 @@ #include "Statistics.h" #include <qpid/messaging/Message.h> #include <ostream> +#include <iomanip> namespace qpid { namespace tests { +using namespace std; +const int WIDTH=10; + Statistic::~Statistic() {} Throughput::Throughput() : messages(0), started(false) {} @@ -37,19 +41,19 @@ void Throughput::message(const messaging::Message&) { } } -void Throughput::header(std::ostream& o) const { - o << "msg/sec"; +void Throughput::header(ostream& o) const { + o << setw(WIDTH) << "msg/sec"; } -void Throughput::report(std::ostream& o) const { +void Throughput::report(ostream& o) const { double elapsed(int64_t(sys::Duration(start, sys::now()))/double(sys::TIME_SEC)); - o << messages/elapsed; + o << setw(WIDTH) << messages/elapsed; } ThroughputAndLatency::ThroughputAndLatency() : total(0), - min(std::numeric_limits<double>::max()), - max(std::numeric_limits<double>::min()) + min(numeric_limits<double>::max()), + max(numeric_limits<double>::min()) {} void ThroughputAndLatency::message(const messaging::Message& m) { @@ -67,22 +71,25 @@ void ThroughputAndLatency::message(const messaging::Message& m) { } } -void ThroughputAndLatency::header(std::ostream& o) const { +void ThroughputAndLatency::header(ostream& o) const { Throughput::header(o); - o << " latency(ms)min max avg"; + o << setw(3*WIDTH) << "latency(ms): min max avg"; } -void ThroughputAndLatency::report(std::ostream& o) const { +void ThroughputAndLatency::report(ostream& o) const { Throughput::report(o); - o << " "; if (messages) - o << min << " " << max << " " << total/messages; + o << setw(WIDTH) << min << setw(WIDTH) << max << setw(WIDTH) << total/messages; else o << "Can't compute latency for 0 messages."; } -ReporterBase::ReporterBase(std::ostream& o, int batch) - : wantBatch(batch), batchCount(0), headerPrinted(false), out(o) {} +ReporterBase::ReporterBase(ostream& o, int batch) + : wantBatch(batch), batchCount(0), headerPrinted(false), out(o) +{ + o.precision(2); + o << fixed; +} ReporterBase::~ReporterBase() {} @@ -96,7 +103,7 @@ void ReporterBase::message(const messaging::Message& m) { if (++batchCount == wantBatch) { header(); batch->report(out); - out << std::endl; + out << endl; batch = create(); batchCount = 0; } @@ -107,14 +114,14 @@ void ReporterBase::message(const messaging::Message& m) { void ReporterBase::report() { header(); overall->report(out); - out << std::endl; + out << endl; } void ReporterBase::header() { if (!headerPrinted) { if (!overall.get()) overall = create(); overall->header(out); - out << std::endl; + out << endl; headerPrinted = true; } } diff --git a/qpid/cpp/src/tests/qpid_cpp_benchmark b/qpid/cpp/src/tests/qpid_cpp_benchmark new file mode 100755 index 0000000000..0b940dc30a --- /dev/null +++ b/qpid/cpp/src/tests/qpid_cpp_benchmark @@ -0,0 +1,106 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import optparse, time, qpid.messaging +from threading import Thread +from subprocess import Popen, PIPE, STDOUT + +op = optparse.OptionParser(usage="usage: %prog [options]", + description="simple performance benchmarks") +op.add_option("-b", "--broker", default="127.0.0.1", + help="url of broker to connect to") +op.add_option("-q", "--queues", default=1, type="int", metavar="N", + help="create N queues (default %default)") +op.add_option("-s", "--senders", default=1, type="int", metavar="N", + help="start N senders per queue (default %default)") +op.add_option("-r", "--receivers", default=1, type="int", metavar="N", + help="start N receivers per queue (default %default)") +op.add_option("-m", "--messages", default=100000, type="int", metavar="N", + help="send N messages per sender (default %default)") +op.add_option("--queue-name", default="benchmark", + help="base name for queues (default %default)") +op.add_option("--send-rate", default=0, metavar="R", + help="send rate limited to R messages/second, 0 means no limit") + +def start_receive(queue, opts): + return Popen(["qpid_receive", + "-b", opts.broker, + "-a", "%s;{create:always}"%(queue), + "--forever", + "--print-content=no", + "--report-total", + "--rate %s"%(opts.send_rate)], + stdout=PIPE, stderr=STDOUT) + +def start_send(queue, opts): + return Popen(["qpid_send", + "-b", opts.broker, + "-a", queue, + "--count", str(opts.messages), + "--send-eos", str(opts.receivers), + "--content", "benchmark", + "--report-total"], + stdout=PIPE, stderr=STDOUT) + +def wait_for_output(p): + out,err=p.communicate() + if p.returncode != 0: raise Exception("ERROR:\n%s"%(out)) + return out + +def delete_queues(queues, broker): + c = qpid.messaging.Connection(broker) + c.open() + for q in queues: + try: s = c.session().sender("%s;{delete:always}"%(q)) + except qpid.messaging.exceptions.SendError:pass # Ignore "no such queue" + c.close() + +def wait_for_queues(queues, broker): + c = qpid.messaging.Connection(broker) + c.open() + s = c.session() + while True: + try: + for q in queues: s.sender(q) + break + except: pass + c.close() + +def skip_first_line(text): return "\n".join(text.split("\n")[1:]) + +def print_output(processes): + print wait_for_output(processes.pop(0)), + for p in processes: print skip_first_line(wait_for_output(p)), + +def main(): + opts, args = op.parse_args() + queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)] + delete_queues(queues, opts.broker) + receivers = [start_receive(q, opts) for q in queues for i in xrange(opts.receivers)] + wait_for_queues(queues, opts.broker) # Wait for receivers to be ready + senders = [start_send(q, opts) for q in queues for i in xrange(opts.senders)] + print "Send" + print_output(senders) + print "\nReceive" + print_output(receivers) + delete_queues(queues, opts.broker) + +if __name__ == "__main__": main() + |