summaryrefslogtreecommitdiff
path: root/java/tools/bin/qpid-jms-benchmark
diff options
context:
space:
mode:
Diffstat (limited to 'java/tools/bin/qpid-jms-benchmark')
-rwxr-xr-xjava/tools/bin/qpid-jms-benchmark316
1 files changed, 316 insertions, 0 deletions
diff --git a/java/tools/bin/qpid-jms-benchmark b/java/tools/bin/qpid-jms-benchmark
new file mode 100755
index 0000000000..3d712a27dc
--- /dev/null
+++ b/java/tools/bin/qpid-jms-benchmark
@@ -0,0 +1,316 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse, time, qpid.messaging, re
+from threading import Thread
+from subprocess import Popen, PIPE, STDOUT
+
+op = optparse.OptionParser(usage="usage: %prog [options]",
+ description="simple performance benchmarks")
+op.add_option("-b", "--broker", default=[], action="append", type="str",
+ help="url of broker(s) to connect to, round robin on multiple brokers")
+op.add_option("-c", "--client-host", default=[], action="append", type="str",
+ help="host(s) to run clients on via ssh, round robin on mulple hosts")
+op.add_option("-q", "--queues", default=1, type="int", metavar="N",
+ help="create N queues (default %default)")
+op.add_option("-s", "--senders", default=1, type="int", metavar="N",
+ help="start N senders per queue (default %default)")
+op.add_option("-r", "--receivers", default=1, type="int", metavar="N",
+ help="start N receivers per queue (default %default)")
+op.add_option("-m", "--messages", default=100000, type="int", metavar="N",
+ help="send N messages per sender (default %default)")
+op.add_option("--queue-name", default="benchmark", metavar="NAME",
+ help="base name for queues (default %default)")
+op.add_option("--send-rate", default=0, metavar="N",
+ help="send rate limited to N messages/second, 0 means no limit (default %default)")
+op.add_option("--receive-rate", default=0, metavar="N",
+ help="receive rate limited to N messages/second, 0 means no limit (default %default)")
+op.add_option("--content-size", default=1024, type="int", metavar="BYTES",
+ help="message size in bytes (default %default)")
+op.add_option("--ack-frequency", default=100, metavar="N", type="int",
+ help="receiver ack's every N messages, 0 means unconfirmed (default %default)")
+op.add_option("--no-report-header", dest="report_header", default=True,
+ action="store_false", help="don't print header on report")
+op.add_option("--summarize", default=False, action="store_true",
+ help="print summary statistics for multiple senders/receivers: total throughput, average latency")
+op.add_option("--repeat", default=1, metavar="N", help="repeat N times", type="int")
+op.add_option("--send-option", default=[], action="append", type="str",
+ help="Additional option for sending addresses")
+op.add_option("--receive-option", default=[], action="append", type="str",
+ help="Additional option for receiving addresses")
+op.add_option("--create-option", default=[], action="append", type="str",
+ help="Additional option for creating addresses")
+op.add_option("--send-arg", default=[], action="append", type="str",
+ help="Additional argument for qpid-send")
+op.add_option("--receive-arg", default=[], action="append", type="str",
+ help="Additional argument for qpid-receive")
+op.add_option("--no-timestamp", dest="timestamp", default=True,
+ action="store_false", help="don't add a timestamp, no latency results")
+op.add_option("--sequence", dest="sequence", default=False,
+ action="store_true", help="add a sequence number to each message")
+op.add_option("--connection-options", type="str",
+ help="Connection options for senders & receivers")
+op.add_option("--durable", default=False, action="store_true",
+ help="Use durable queues and messages")
+op.add_option("--save-received", default=False, action="store_true",
+ help="Save received message content to files <queuename>-receiver-<n>.msg")
+op.add_option("--verbose", default=False, action="store_true",
+ help="Show commands executed")
+op.add_option("--no-delete", default=False, action="store_true",
+ help="Don't delete the test queues.")
+op.add_option("--fill-drain", default=False, action="store_true",
+ help="First fill the queues, then drain them")
+
+single_quote_re = re.compile("'")
+def posix_quote(string):
+ """ Quote a string for use as an argument in a posix shell"""
+ return "'" + single_quote_re.sub("\\'", string) + "'";
+
+def ssh_command(host, command):
+ """ Convert command into an ssh command on host with quoting"""
+ return ["ssh", host] + [posix_quote(arg) for arg in command]
+
+class Clients:
+ def __init__(self): self.clients=[]
+
+ def add(self, client):
+ self.clients.append(client)
+ return client
+
+ def kill(self):
+ for c in self.clients:
+ try: c.kill()
+ except: pass
+
+class PopenCommand(Popen):
+ """Like Popen but you can query for the command"""
+ def __init__(self, command, *args, **kwargs):
+ self.command = command
+ Popen.__init__(self, command, *args, **kwargs)
+
+clients = Clients()
+
+def start_receive(queue, index, opts, ready_queue, broker, host):
+ address_opts=opts.receive_option
+ if opts.durable: address_opts += ["node:{durable:true}"]
+ address="%s;{%s}"%(queue,",".join(address_opts))
+ msg_total=opts.senders*opts.messages
+ messages = msg_total/opts.receivers;
+ if (index < msg_total%opts.receivers): messages += 1
+ if (messages == 0): return None
+ command = ["qpid-jms-receive",
+ #"-b", broker,
+ "--ready-address", "benchmark-ready;{create:always}",
+ "-a", address,
+ "-m", str(messages),
+ "--forever",
+ "--print-content=no",
+ # "--receive-rate", str(opts.receive_rate),
+ "--report-total",
+ "--ack-frequency", str(opts.ack_frequency),
+ # "--ready-address", "%s;{create:always}"%ready_queue,
+ "--report-header=no -v"
+ ]
+ if opts.save_received:
+ command += ["--save-content=%s-receiver-%s.msg"%(queue,index)]
+ command += opts.receive_arg
+ if opts.connection_options:
+ command += ["--connection-options",opts.connection_options]
+ if host: command = ssh_command(host, command)
+ if opts.verbose: print "Receiver: ", command
+ return clients.add(PopenCommand(command, stdout=PIPE, stderr=PIPE))
+
+def start_send(queue, opts, broker, host):
+ address="%s;{%s}"%(queue,",".join(opts.send_option + ["create:always"]))
+ command = ["qpid-jms-send",
+ #"-b", broker,
+ "-a", address,
+ "--messages", str(opts.messages),
+ "--content-size", str(opts.content_size),
+ "--send-rate", str(opts.send_rate),
+ "--report-total",
+ "--report-header=no",
+ "--timestamp=%s"%(opts.timestamp and "yes" or "no"),
+ "--sequence=%s"%(opts.sequence and "yes" or "no"),
+ "--durable", str(opts.durable)
+ ]
+ command += opts.send_arg
+ if opts.connection_options:
+ command += ["--connection-options",opts.connection_options]
+ if host: command = ssh_command(host, command)
+ if opts.verbose: print "Sender: ", command
+ return clients.add(PopenCommand(command, stdout=PIPE, stderr=PIPE))
+
+def error_msg(out, err):
+ return ("\n[stdout]\n%s\n[stderr]\n%s[end]"%(out, err))
+
+def first_line(p):
+ out,err=p.communicate()
+ if p.returncode != 0:
+ raise Exception("Process exit %d: %s"%(p.returncode, error_msg(out,err)))
+
+ print str(out)
+ print str(err)
+ return out.split("\n")[0]
+
+def recreate_queues(queues, brokers, no_delete, opts):
+ c = qpid.messaging.Connection(brokers[0])
+ c.open()
+ s = c.session()
+ for q in queues:
+ if not no_delete:
+ try: s.sender("%s;{delete:always}"%(q)).close()
+ except qpid.messaging.exceptions.NotFound: pass
+ address = "%s;{%s}"%(q, ",".join(opts.create_option + ["create:always"]))
+ if opts.verbose: print "Creating", address
+ s.sender(address)
+ c.close()
+
+def print_header(timestamp):
+ if timestamp: latency_header="\tl-min\tl-max\tl-avg\ttotal-tp"
+ else: latency_header=""
+ print "send-tp\trecv-tp%s"%latency_header
+
+def parse(parser, lines): # Parse sender/receiver output
+ return [map(lambda p: p[0](p[1]), zip(parser,line.split())) for line in lines]
+
+def parse_senders(senders):
+ return parse([int],[first_line(p) for p in senders])
+
+def parse_receivers(receivers):
+ return parse([int,float,float,float],[first_line(p) for p in receivers if p])
+
+def print_data(send_stats, recv_stats, total_tp):
+ for send,recv in map(None, send_stats, recv_stats):
+ line=""
+ if send: line += "%d"%send[0]
+ if recv:
+ line += "\t%d"%recv[0]
+ if len(recv) == 4: line += "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:])
+ if total_tp is not None:
+ line += "\t%d"%total_tp
+ total_tp = None
+ print line
+
+def print_summary(send_stats, recv_stats, total_tp):
+ def avg(s): sum(s) / len(s)
+ send_tp = sum([l[0] for l in send_stats])
+ recv_tp = sum([l[0] for l in recv_stats])
+ summary = "%d\t%d"%(send_tp, recv_tp)
+ if recv_stats and len(recv_stats[0]) == 4:
+ l_min = sum(l[1] for l in recv_stats)/len(recv_stats)
+ l_max = sum(l[2] for l in recv_stats)/len(recv_stats)
+ l_avg = sum(l[3] for l in recv_stats)/len(recv_stats)
+ summary += "\t%.2f\t%.2f\t%.2f"%(l_min, l_max, l_avg)
+ summary += "\t%d"%total_tp
+ print summary
+
+
+class ReadyReceiver:
+ """A receiver for ready messages"""
+ def __init__(self, queue, broker):
+ self.connection = qpid.messaging.Connection(broker)
+ self.connection.open()
+ self.receiver = self.connection.session().receiver(
+ "%s;{create:receiver,delete:receiver,node:{durable:false}}"%(queue))
+ self.receiver.session.sync()
+ self.timeout=10
+
+ def wait(self, receivers):
+ try:
+ for i in receivers: self.receiver.fetch(self.timeout)
+ self.connection.close()
+ except qpid.messaging.Empty:
+ for r in receivers:
+ if (r.poll() is not None):
+ out,err=r.communicate()
+ raise Exception("Receiver error: %s\n%s" %
+ (" ".join(r.command), error_msg(out,err)))
+ raise Exception("Timed out waiting for receivers to be ready")
+
+def flatten(l):
+ return sum(map(lambda s: re.split(re.compile("\s*,\s*|\s+"), s), l), [])
+
+class RoundRobin:
+ def __init__(self,items):
+ self.items = items
+ self.index = 0
+
+ def next(self):
+ if not self.items: return None
+ ret = self.items[self.index]
+ self.index = (self.index+1)%len(self.items)
+ return ret
+
+def main():
+ opts, args = op.parse_args()
+ opts.client_host = flatten(opts.client_host)
+ if not opts.broker:
+ if opts.client_host:
+ raise Exception("--broker must be specified if --client_host is.")
+ opts.broker = ["127.0.0.1"] # Deafult to local broker
+ opts.broker = flatten(opts.broker)
+ brokers = RoundRobin(opts.broker)
+ client_hosts = RoundRobin(opts.client_host)
+ send_out = ""
+ receive_out = ""
+ ready_queue="%s-ready"%(opts.queue_name)
+ queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)]
+ try:
+ for i in xrange(opts.repeat):
+ recreate_queues(queues, opts.broker, opts.no_delete, opts)
+ ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
+
+ def start_receivers():
+ return [ start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next())
+ for q in queues for j in xrange(opts.receivers) ]
+
+
+ def start_senders():
+ return [ start_send(q, opts,brokers.next(), client_hosts.next())
+ for q in queues for j in xrange(opts.senders) ]
+
+ if opts.report_header and i == 0: print_header(opts.timestamp)
+
+ if opts.fill_drain:
+ # First fill the queues, then drain them
+ start = time.time()
+ senders = start_senders()
+ for p in senders: p.wait()
+ receivers = start_receivers()
+ for p in receivers: p.wait()
+ else:
+ # Run senders and receivers in parallel
+ receivers = start_receivers()
+ ready_receiver.wait(filter(None, receivers)) # Wait for receivers ready
+ start = time.time()
+ senders = start_senders()
+ for p in senders + receivers: p.wait()
+
+ total_sent = opts.queues * opts.senders * opts.messages
+ total_tp = total_sent / (time.time()-start)
+ #send_stats=parse_senders(senders)
+ recv_stats=parse_receivers(receivers)
+ #if opts.summarize: print_summary(send_stats, recv_stats, total_tp)
+ #else: print_data(send_stats, recv_stats, total_tp)
+ finally: clients.kill() # No strays
+
+if __name__ == "__main__": main()
+