diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2012-05-09 16:19:36 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2012-05-09 16:19:36 +0000 |
commit | b72b71c60b33c5496b2fdc9df32185e535dde147 (patch) | |
tree | 4a5a6abf8147bcc753af60594ebd61e134a5dfa3 /qpid | |
parent | 11fd2707e426292b8d6462d36d911171bd3a48a2 (diff) | |
download | qpid-python-b72b71c60b33c5496b2fdc9df32185e535dde147.tar.gz |
QPID-3941 This is a carbon copy of qpid-cpp-benchmark with minor
modifications to comment out the broker option (as the URL formats are
different) and rearragined the order of args passed. Currently there is
an issue of parsing the receiver output. Other than that it works
without any changes.
We need to move this file as qpid-benchmark and add options to allow
running either c++ or jms/java versions or mix and match them.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1336280 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rwxr-xr-x | qpid/java/tools/bin/qpid-jms-benchmark | 316 |
1 files changed, 316 insertions, 0 deletions
diff --git a/qpid/java/tools/bin/qpid-jms-benchmark b/qpid/java/tools/bin/qpid-jms-benchmark new file mode 100755 index 0000000000..3d712a27dc --- /dev/null +++ b/qpid/java/tools/bin/qpid-jms-benchmark @@ -0,0 +1,316 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import optparse, time, qpid.messaging, re +from threading import Thread +from subprocess import Popen, PIPE, STDOUT + +op = optparse.OptionParser(usage="usage: %prog [options]", + description="simple performance benchmarks") +op.add_option("-b", "--broker", default=[], action="append", type="str", + help="url of broker(s) to connect to, round robin on multiple brokers") +op.add_option("-c", "--client-host", default=[], action="append", type="str", + help="host(s) to run clients on via ssh, round robin on mulple hosts") +op.add_option("-q", "--queues", default=1, type="int", metavar="N", + help="create N queues (default %default)") +op.add_option("-s", "--senders", default=1, type="int", metavar="N", + help="start N senders per queue (default %default)") +op.add_option("-r", "--receivers", default=1, type="int", metavar="N", + help="start N receivers per queue (default %default)") +op.add_option("-m", "--messages", default=100000, type="int", metavar="N", + help="send N messages per sender (default %default)") +op.add_option("--queue-name", default="benchmark", metavar="NAME", + help="base name for queues (default %default)") +op.add_option("--send-rate", default=0, metavar="N", + help="send rate limited to N messages/second, 0 means no limit (default %default)") +op.add_option("--receive-rate", default=0, metavar="N", + help="receive rate limited to N messages/second, 0 means no limit (default %default)") +op.add_option("--content-size", default=1024, type="int", metavar="BYTES", + help="message size in bytes (default %default)") +op.add_option("--ack-frequency", default=100, metavar="N", type="int", + help="receiver ack's every N messages, 0 means unconfirmed (default %default)") +op.add_option("--no-report-header", dest="report_header", default=True, + action="store_false", help="don't print header on report") +op.add_option("--summarize", default=False, action="store_true", + help="print summary statistics for multiple senders/receivers: total throughput, average latency") +op.add_option("--repeat", default=1, metavar="N", help="repeat N times", type="int") +op.add_option("--send-option", default=[], action="append", type="str", + help="Additional option for sending addresses") +op.add_option("--receive-option", default=[], action="append", type="str", + help="Additional option for receiving addresses") +op.add_option("--create-option", default=[], action="append", type="str", + help="Additional option for creating addresses") +op.add_option("--send-arg", default=[], action="append", type="str", + help="Additional argument for qpid-send") +op.add_option("--receive-arg", default=[], action="append", type="str", + help="Additional argument for qpid-receive") +op.add_option("--no-timestamp", dest="timestamp", default=True, + action="store_false", help="don't add a timestamp, no latency results") +op.add_option("--sequence", dest="sequence", default=False, + action="store_true", help="add a sequence number to each message") +op.add_option("--connection-options", type="str", + help="Connection options for senders & receivers") +op.add_option("--durable", default=False, action="store_true", + help="Use durable queues and messages") +op.add_option("--save-received", default=False, action="store_true", + help="Save received message content to files <queuename>-receiver-<n>.msg") +op.add_option("--verbose", default=False, action="store_true", + help="Show commands executed") +op.add_option("--no-delete", default=False, action="store_true", + help="Don't delete the test queues.") +op.add_option("--fill-drain", default=False, action="store_true", + help="First fill the queues, then drain them") + +single_quote_re = re.compile("'") +def posix_quote(string): + """ Quote a string for use as an argument in a posix shell""" + return "'" + single_quote_re.sub("\\'", string) + "'"; + +def ssh_command(host, command): + """ Convert command into an ssh command on host with quoting""" + return ["ssh", host] + [posix_quote(arg) for arg in command] + +class Clients: + def __init__(self): self.clients=[] + + def add(self, client): + self.clients.append(client) + return client + + def kill(self): + for c in self.clients: + try: c.kill() + except: pass + +class PopenCommand(Popen): + """Like Popen but you can query for the command""" + def __init__(self, command, *args, **kwargs): + self.command = command + Popen.__init__(self, command, *args, **kwargs) + +clients = Clients() + +def start_receive(queue, index, opts, ready_queue, broker, host): + address_opts=opts.receive_option + if opts.durable: address_opts += ["node:{durable:true}"] + address="%s;{%s}"%(queue,",".join(address_opts)) + msg_total=opts.senders*opts.messages + messages = msg_total/opts.receivers; + if (index < msg_total%opts.receivers): messages += 1 + if (messages == 0): return None + command = ["qpid-jms-receive", + #"-b", broker, + "--ready-address", "benchmark-ready;{create:always}", + "-a", address, + "-m", str(messages), + "--forever", + "--print-content=no", + # "--receive-rate", str(opts.receive_rate), + "--report-total", + "--ack-frequency", str(opts.ack_frequency), + # "--ready-address", "%s;{create:always}"%ready_queue, + "--report-header=no -v" + ] + if opts.save_received: + command += ["--save-content=%s-receiver-%s.msg"%(queue,index)] + command += opts.receive_arg + if opts.connection_options: + command += ["--connection-options",opts.connection_options] + if host: command = ssh_command(host, command) + if opts.verbose: print "Receiver: ", command + return clients.add(PopenCommand(command, stdout=PIPE, stderr=PIPE)) + +def start_send(queue, opts, broker, host): + address="%s;{%s}"%(queue,",".join(opts.send_option + ["create:always"])) + command = ["qpid-jms-send", + #"-b", broker, + "-a", address, + "--messages", str(opts.messages), + "--content-size", str(opts.content_size), + "--send-rate", str(opts.send_rate), + "--report-total", + "--report-header=no", + "--timestamp=%s"%(opts.timestamp and "yes" or "no"), + "--sequence=%s"%(opts.sequence and "yes" or "no"), + "--durable", str(opts.durable) + ] + command += opts.send_arg + if opts.connection_options: + command += ["--connection-options",opts.connection_options] + if host: command = ssh_command(host, command) + if opts.verbose: print "Sender: ", command + return clients.add(PopenCommand(command, stdout=PIPE, stderr=PIPE)) + +def error_msg(out, err): + return ("\n[stdout]\n%s\n[stderr]\n%s[end]"%(out, err)) + +def first_line(p): + out,err=p.communicate() + if p.returncode != 0: + raise Exception("Process exit %d: %s"%(p.returncode, error_msg(out,err))) + + print str(out) + print str(err) + return out.split("\n")[0] + +def recreate_queues(queues, brokers, no_delete, opts): + c = qpid.messaging.Connection(brokers[0]) + c.open() + s = c.session() + for q in queues: + if not no_delete: + try: s.sender("%s;{delete:always}"%(q)).close() + except qpid.messaging.exceptions.NotFound: pass + address = "%s;{%s}"%(q, ",".join(opts.create_option + ["create:always"])) + if opts.verbose: print "Creating", address + s.sender(address) + c.close() + +def print_header(timestamp): + if timestamp: latency_header="\tl-min\tl-max\tl-avg\ttotal-tp" + else: latency_header="" + print "send-tp\trecv-tp%s"%latency_header + +def parse(parser, lines): # Parse sender/receiver output + return [map(lambda p: p[0](p[1]), zip(parser,line.split())) for line in lines] + +def parse_senders(senders): + return parse([int],[first_line(p) for p in senders]) + +def parse_receivers(receivers): + return parse([int,float,float,float],[first_line(p) for p in receivers if p]) + +def print_data(send_stats, recv_stats, total_tp): + for send,recv in map(None, send_stats, recv_stats): + line="" + if send: line += "%d"%send[0] + if recv: + line += "\t%d"%recv[0] + if len(recv) == 4: line += "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:]) + if total_tp is not None: + line += "\t%d"%total_tp + total_tp = None + print line + +def print_summary(send_stats, recv_stats, total_tp): + def avg(s): sum(s) / len(s) + send_tp = sum([l[0] for l in send_stats]) + recv_tp = sum([l[0] for l in recv_stats]) + summary = "%d\t%d"%(send_tp, recv_tp) + if recv_stats and len(recv_stats[0]) == 4: + l_min = sum(l[1] for l in recv_stats)/len(recv_stats) + l_max = sum(l[2] for l in recv_stats)/len(recv_stats) + l_avg = sum(l[3] for l in recv_stats)/len(recv_stats) + summary += "\t%.2f\t%.2f\t%.2f"%(l_min, l_max, l_avg) + summary += "\t%d"%total_tp + print summary + + +class ReadyReceiver: + """A receiver for ready messages""" + def __init__(self, queue, broker): + self.connection = qpid.messaging.Connection(broker) + self.connection.open() + self.receiver = self.connection.session().receiver( + "%s;{create:receiver,delete:receiver,node:{durable:false}}"%(queue)) + self.receiver.session.sync() + self.timeout=10 + + def wait(self, receivers): + try: + for i in receivers: self.receiver.fetch(self.timeout) + self.connection.close() + except qpid.messaging.Empty: + for r in receivers: + if (r.poll() is not None): + out,err=r.communicate() + raise Exception("Receiver error: %s\n%s" % + (" ".join(r.command), error_msg(out,err))) + raise Exception("Timed out waiting for receivers to be ready") + +def flatten(l): + return sum(map(lambda s: re.split(re.compile("\s*,\s*|\s+"), s), l), []) + +class RoundRobin: + def __init__(self,items): + self.items = items + self.index = 0 + + def next(self): + if not self.items: return None + ret = self.items[self.index] + self.index = (self.index+1)%len(self.items) + return ret + +def main(): + opts, args = op.parse_args() + opts.client_host = flatten(opts.client_host) + if not opts.broker: + if opts.client_host: + raise Exception("--broker must be specified if --client_host is.") + opts.broker = ["127.0.0.1"] # Deafult to local broker + opts.broker = flatten(opts.broker) + brokers = RoundRobin(opts.broker) + client_hosts = RoundRobin(opts.client_host) + send_out = "" + receive_out = "" + ready_queue="%s-ready"%(opts.queue_name) + queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)] + try: + for i in xrange(opts.repeat): + recreate_queues(queues, opts.broker, opts.no_delete, opts) + ready_receiver = ReadyReceiver(ready_queue, opts.broker[0]) + + def start_receivers(): + return [ start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next()) + for q in queues for j in xrange(opts.receivers) ] + + + def start_senders(): + return [ start_send(q, opts,brokers.next(), client_hosts.next()) + for q in queues for j in xrange(opts.senders) ] + + if opts.report_header and i == 0: print_header(opts.timestamp) + + if opts.fill_drain: + # First fill the queues, then drain them + start = time.time() + senders = start_senders() + for p in senders: p.wait() + receivers = start_receivers() + for p in receivers: p.wait() + else: + # Run senders and receivers in parallel + receivers = start_receivers() + ready_receiver.wait(filter(None, receivers)) # Wait for receivers ready + start = time.time() + senders = start_senders() + for p in senders + receivers: p.wait() + + total_sent = opts.queues * opts.senders * opts.messages + total_tp = total_sent / (time.time()-start) + #send_stats=parse_senders(senders) + recv_stats=parse_receivers(receivers) + #if opts.summarize: print_summary(send_stats, recv_stats, total_tp) + #else: print_data(send_stats, recv_stats, total_tp) + finally: clients.kill() # No strays + +if __name__ == "__main__": main() + |