summaryrefslogtreecommitdiff
path: root/qpid
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2012-05-09 16:19:36 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2012-05-09 16:19:36 +0000
commitb72b71c60b33c5496b2fdc9df32185e535dde147 (patch)
tree4a5a6abf8147bcc753af60594ebd61e134a5dfa3 /qpid
parent11fd2707e426292b8d6462d36d911171bd3a48a2 (diff)
downloadqpid-python-b72b71c60b33c5496b2fdc9df32185e535dde147.tar.gz
QPID-3941 This is a carbon copy of qpid-cpp-benchmark with minor
modifications to comment out the broker option (as the URL formats are different) and rearragined the order of args passed. Currently there is an issue of parsing the receiver output. Other than that it works without any changes. We need to move this file as qpid-benchmark and add options to allow running either c++ or jms/java versions or mix and match them. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1336280 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rwxr-xr-xqpid/java/tools/bin/qpid-jms-benchmark316
1 files changed, 316 insertions, 0 deletions
diff --git a/qpid/java/tools/bin/qpid-jms-benchmark b/qpid/java/tools/bin/qpid-jms-benchmark
new file mode 100755
index 0000000000..3d712a27dc
--- /dev/null
+++ b/qpid/java/tools/bin/qpid-jms-benchmark
@@ -0,0 +1,316 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse, time, qpid.messaging, re
+from threading import Thread
+from subprocess import Popen, PIPE, STDOUT
+
+op = optparse.OptionParser(usage="usage: %prog [options]",
+ description="simple performance benchmarks")
+op.add_option("-b", "--broker", default=[], action="append", type="str",
+ help="url of broker(s) to connect to, round robin on multiple brokers")
+op.add_option("-c", "--client-host", default=[], action="append", type="str",
+ help="host(s) to run clients on via ssh, round robin on mulple hosts")
+op.add_option("-q", "--queues", default=1, type="int", metavar="N",
+ help="create N queues (default %default)")
+op.add_option("-s", "--senders", default=1, type="int", metavar="N",
+ help="start N senders per queue (default %default)")
+op.add_option("-r", "--receivers", default=1, type="int", metavar="N",
+ help="start N receivers per queue (default %default)")
+op.add_option("-m", "--messages", default=100000, type="int", metavar="N",
+ help="send N messages per sender (default %default)")
+op.add_option("--queue-name", default="benchmark", metavar="NAME",
+ help="base name for queues (default %default)")
+op.add_option("--send-rate", default=0, metavar="N",
+ help="send rate limited to N messages/second, 0 means no limit (default %default)")
+op.add_option("--receive-rate", default=0, metavar="N",
+ help="receive rate limited to N messages/second, 0 means no limit (default %default)")
+op.add_option("--content-size", default=1024, type="int", metavar="BYTES",
+ help="message size in bytes (default %default)")
+op.add_option("--ack-frequency", default=100, metavar="N", type="int",
+ help="receiver ack's every N messages, 0 means unconfirmed (default %default)")
+op.add_option("--no-report-header", dest="report_header", default=True,
+ action="store_false", help="don't print header on report")
+op.add_option("--summarize", default=False, action="store_true",
+ help="print summary statistics for multiple senders/receivers: total throughput, average latency")
+op.add_option("--repeat", default=1, metavar="N", help="repeat N times", type="int")
+op.add_option("--send-option", default=[], action="append", type="str",
+ help="Additional option for sending addresses")
+op.add_option("--receive-option", default=[], action="append", type="str",
+ help="Additional option for receiving addresses")
+op.add_option("--create-option", default=[], action="append", type="str",
+ help="Additional option for creating addresses")
+op.add_option("--send-arg", default=[], action="append", type="str",
+ help="Additional argument for qpid-send")
+op.add_option("--receive-arg", default=[], action="append", type="str",
+ help="Additional argument for qpid-receive")
+op.add_option("--no-timestamp", dest="timestamp", default=True,
+ action="store_false", help="don't add a timestamp, no latency results")
+op.add_option("--sequence", dest="sequence", default=False,
+ action="store_true", help="add a sequence number to each message")
+op.add_option("--connection-options", type="str",
+ help="Connection options for senders & receivers")
+op.add_option("--durable", default=False, action="store_true",
+ help="Use durable queues and messages")
+op.add_option("--save-received", default=False, action="store_true",
+ help="Save received message content to files <queuename>-receiver-<n>.msg")
+op.add_option("--verbose", default=False, action="store_true",
+ help="Show commands executed")
+op.add_option("--no-delete", default=False, action="store_true",
+ help="Don't delete the test queues.")
+op.add_option("--fill-drain", default=False, action="store_true",
+ help="First fill the queues, then drain them")
+
+single_quote_re = re.compile("'")
+def posix_quote(string):
+ """ Quote a string for use as an argument in a posix shell"""
+ return "'" + single_quote_re.sub("\\'", string) + "'";
+
+def ssh_command(host, command):
+ """ Convert command into an ssh command on host with quoting"""
+ return ["ssh", host] + [posix_quote(arg) for arg in command]
+
+class Clients:
+ def __init__(self): self.clients=[]
+
+ def add(self, client):
+ self.clients.append(client)
+ return client
+
+ def kill(self):
+ for c in self.clients:
+ try: c.kill()
+ except: pass
+
+class PopenCommand(Popen):
+ """Like Popen but you can query for the command"""
+ def __init__(self, command, *args, **kwargs):
+ self.command = command
+ Popen.__init__(self, command, *args, **kwargs)
+
+clients = Clients()
+
+def start_receive(queue, index, opts, ready_queue, broker, host):
+ address_opts=opts.receive_option
+ if opts.durable: address_opts += ["node:{durable:true}"]
+ address="%s;{%s}"%(queue,",".join(address_opts))
+ msg_total=opts.senders*opts.messages
+ messages = msg_total/opts.receivers;
+ if (index < msg_total%opts.receivers): messages += 1
+ if (messages == 0): return None
+ command = ["qpid-jms-receive",
+ #"-b", broker,
+ "--ready-address", "benchmark-ready;{create:always}",
+ "-a", address,
+ "-m", str(messages),
+ "--forever",
+ "--print-content=no",
+ # "--receive-rate", str(opts.receive_rate),
+ "--report-total",
+ "--ack-frequency", str(opts.ack_frequency),
+ # "--ready-address", "%s;{create:always}"%ready_queue,
+ "--report-header=no -v"
+ ]
+ if opts.save_received:
+ command += ["--save-content=%s-receiver-%s.msg"%(queue,index)]
+ command += opts.receive_arg
+ if opts.connection_options:
+ command += ["--connection-options",opts.connection_options]
+ if host: command = ssh_command(host, command)
+ if opts.verbose: print "Receiver: ", command
+ return clients.add(PopenCommand(command, stdout=PIPE, stderr=PIPE))
+
+def start_send(queue, opts, broker, host):
+ address="%s;{%s}"%(queue,",".join(opts.send_option + ["create:always"]))
+ command = ["qpid-jms-send",
+ #"-b", broker,
+ "-a", address,
+ "--messages", str(opts.messages),
+ "--content-size", str(opts.content_size),
+ "--send-rate", str(opts.send_rate),
+ "--report-total",
+ "--report-header=no",
+ "--timestamp=%s"%(opts.timestamp and "yes" or "no"),
+ "--sequence=%s"%(opts.sequence and "yes" or "no"),
+ "--durable", str(opts.durable)
+ ]
+ command += opts.send_arg
+ if opts.connection_options:
+ command += ["--connection-options",opts.connection_options]
+ if host: command = ssh_command(host, command)
+ if opts.verbose: print "Sender: ", command
+ return clients.add(PopenCommand(command, stdout=PIPE, stderr=PIPE))
+
+def error_msg(out, err):
+ return ("\n[stdout]\n%s\n[stderr]\n%s[end]"%(out, err))
+
+def first_line(p):
+ out,err=p.communicate()
+ if p.returncode != 0:
+ raise Exception("Process exit %d: %s"%(p.returncode, error_msg(out,err)))
+
+ print str(out)
+ print str(err)
+ return out.split("\n")[0]
+
+def recreate_queues(queues, brokers, no_delete, opts):
+ c = qpid.messaging.Connection(brokers[0])
+ c.open()
+ s = c.session()
+ for q in queues:
+ if not no_delete:
+ try: s.sender("%s;{delete:always}"%(q)).close()
+ except qpid.messaging.exceptions.NotFound: pass
+ address = "%s;{%s}"%(q, ",".join(opts.create_option + ["create:always"]))
+ if opts.verbose: print "Creating", address
+ s.sender(address)
+ c.close()
+
+def print_header(timestamp):
+ if timestamp: latency_header="\tl-min\tl-max\tl-avg\ttotal-tp"
+ else: latency_header=""
+ print "send-tp\trecv-tp%s"%latency_header
+
+def parse(parser, lines): # Parse sender/receiver output
+ return [map(lambda p: p[0](p[1]), zip(parser,line.split())) for line in lines]
+
+def parse_senders(senders):
+ return parse([int],[first_line(p) for p in senders])
+
+def parse_receivers(receivers):
+ return parse([int,float,float,float],[first_line(p) for p in receivers if p])
+
+def print_data(send_stats, recv_stats, total_tp):
+ for send,recv in map(None, send_stats, recv_stats):
+ line=""
+ if send: line += "%d"%send[0]
+ if recv:
+ line += "\t%d"%recv[0]
+ if len(recv) == 4: line += "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:])
+ if total_tp is not None:
+ line += "\t%d"%total_tp
+ total_tp = None
+ print line
+
+def print_summary(send_stats, recv_stats, total_tp):
+ def avg(s): sum(s) / len(s)
+ send_tp = sum([l[0] for l in send_stats])
+ recv_tp = sum([l[0] for l in recv_stats])
+ summary = "%d\t%d"%(send_tp, recv_tp)
+ if recv_stats and len(recv_stats[0]) == 4:
+ l_min = sum(l[1] for l in recv_stats)/len(recv_stats)
+ l_max = sum(l[2] for l in recv_stats)/len(recv_stats)
+ l_avg = sum(l[3] for l in recv_stats)/len(recv_stats)
+ summary += "\t%.2f\t%.2f\t%.2f"%(l_min, l_max, l_avg)
+ summary += "\t%d"%total_tp
+ print summary
+
+
+class ReadyReceiver:
+ """A receiver for ready messages"""
+ def __init__(self, queue, broker):
+ self.connection = qpid.messaging.Connection(broker)
+ self.connection.open()
+ self.receiver = self.connection.session().receiver(
+ "%s;{create:receiver,delete:receiver,node:{durable:false}}"%(queue))
+ self.receiver.session.sync()
+ self.timeout=10
+
+ def wait(self, receivers):
+ try:
+ for i in receivers: self.receiver.fetch(self.timeout)
+ self.connection.close()
+ except qpid.messaging.Empty:
+ for r in receivers:
+ if (r.poll() is not None):
+ out,err=r.communicate()
+ raise Exception("Receiver error: %s\n%s" %
+ (" ".join(r.command), error_msg(out,err)))
+ raise Exception("Timed out waiting for receivers to be ready")
+
+def flatten(l):
+ return sum(map(lambda s: re.split(re.compile("\s*,\s*|\s+"), s), l), [])
+
+class RoundRobin:
+ def __init__(self,items):
+ self.items = items
+ self.index = 0
+
+ def next(self):
+ if not self.items: return None
+ ret = self.items[self.index]
+ self.index = (self.index+1)%len(self.items)
+ return ret
+
+def main():
+ opts, args = op.parse_args()
+ opts.client_host = flatten(opts.client_host)
+ if not opts.broker:
+ if opts.client_host:
+ raise Exception("--broker must be specified if --client_host is.")
+ opts.broker = ["127.0.0.1"] # Deafult to local broker
+ opts.broker = flatten(opts.broker)
+ brokers = RoundRobin(opts.broker)
+ client_hosts = RoundRobin(opts.client_host)
+ send_out = ""
+ receive_out = ""
+ ready_queue="%s-ready"%(opts.queue_name)
+ queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)]
+ try:
+ for i in xrange(opts.repeat):
+ recreate_queues(queues, opts.broker, opts.no_delete, opts)
+ ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
+
+ def start_receivers():
+ return [ start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next())
+ for q in queues for j in xrange(opts.receivers) ]
+
+
+ def start_senders():
+ return [ start_send(q, opts,brokers.next(), client_hosts.next())
+ for q in queues for j in xrange(opts.senders) ]
+
+ if opts.report_header and i == 0: print_header(opts.timestamp)
+
+ if opts.fill_drain:
+ # First fill the queues, then drain them
+ start = time.time()
+ senders = start_senders()
+ for p in senders: p.wait()
+ receivers = start_receivers()
+ for p in receivers: p.wait()
+ else:
+ # Run senders and receivers in parallel
+ receivers = start_receivers()
+ ready_receiver.wait(filter(None, receivers)) # Wait for receivers ready
+ start = time.time()
+ senders = start_senders()
+ for p in senders + receivers: p.wait()
+
+ total_sent = opts.queues * opts.senders * opts.messages
+ total_tp = total_sent / (time.time()-start)
+ #send_stats=parse_senders(senders)
+ recv_stats=parse_receivers(receivers)
+ #if opts.summarize: print_summary(send_stats, recv_stats, total_tp)
+ #else: print_data(send_stats, recv_stats, total_tp)
+ finally: clients.kill() # No strays
+
+if __name__ == "__main__": main()
+