diff options
author | Robert Gemmell <robbie@apache.org> | 2015-06-25 10:22:51 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2015-06-25 10:22:51 +0000 |
commit | 32ae758bc2e8fd962b66a4ab6341b14009f1907e (patch) | |
tree | 2f4d8174813284a6ea58bb6b7f6520aa92287476 /qpid/cpp/src/tests/qpid-cpp-benchmark | |
parent | 116d91ad7825a98af36a869fc751206fbce0c59f (diff) | |
parent | f7e896076143de4572b4f1f67ef0765125f2498d (diff) | |
download | qpid-python-32ae758bc2e8fd962b66a4ab6341b14009f1907e.tar.gz |
NO-JIRA: create branch for qpid-cpp 0.34 RC process
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-cpp-0.34-rc@1687469 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/qpid-cpp-benchmark')
-rwxr-xr-x | qpid/cpp/src/tests/qpid-cpp-benchmark | 363 |
1 files changed, 363 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark new file mode 100755 index 0000000000..2d5ec711fe --- /dev/null +++ b/qpid/cpp/src/tests/qpid-cpp-benchmark @@ -0,0 +1,363 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import optparse, time, re, os + +try: + import qpid_messaging as qm +except ImportError: + qpid_messaging = None + import qpid.messaging as qm + +from threading import Thread +from subprocess import Popen, PIPE, STDOUT + +op = optparse.OptionParser(usage="usage: %prog [options]", + description="simple performance benchmarks") +op.add_option("-b", "--broker", default=[], action="append", type="str", + help="url of broker(s) to connect to, round robin on multiple brokers") +op.add_option("-c", "--client-host", default=[], action="append", type="str", + help="host(s) to run clients on via ssh, round robin on mulple hosts") +op.add_option("-q", "--queues", default=1, type="int", metavar="N", + help="create N queues (default %default)") +op.add_option("-s", "--senders", default=1, type="int", metavar="N", + help="start N senders per queue (default %default)") +op.add_option("-r", "--receivers", default=1, type="int", metavar="N", + help="start N receivers per queue (default %default)") +op.add_option("-m", "--messages", default=100000, type="int", metavar="N", + help="send N messages per sender (default %default)") +op.add_option("--queue-name", default="benchmark", metavar="NAME", + help="base name for queues (default %default)") +op.add_option("--send-rate", default=0, metavar="N", + help="send rate limited to N messages/second, 0 means no limit (default %default)") +op.add_option("--receive-rate", default=0, metavar="N", + help="receive rate limited to N messages/second, 0 means no limit (default %default)") +op.add_option("--content-size", default=1024, type="int", metavar="BYTES", + help="message size in bytes (default %default)") +op.add_option("--ack-frequency", default=100, metavar="N", type="int", + help="receiver ack's every N messages, 0 means unconfirmed (default %default)") +op.add_option("--tx", default=0, metavar="N", type="int", + help="Transaction batch size, 0 means no transactions") +op.add_option("--no-report-header", dest="report_header", default=True, + action="store_false", help="don't print header on report") +op.add_option("--summarize", default=False, action="store_true", + help="print summary statistics for multiple senders/receivers: total throughput, average latency") +op.add_option("--repeat", default=1, metavar="N", help="repeat N times", type="int") +op.add_option("--send-option", default=[], action="append", type="str", + help="Additional option for sending addresses") +op.add_option("--receive-option", default=[], action="append", type="str", + help="Additional option for receiving addresses") +op.add_option("--create-option", default=[], action="append", type="str", + help="Additional option for creating addresses") +op.add_option("--send-arg", default=[], action="append", type="str", + help="Additional argument for qpid-send") +op.add_option("--receive-arg", default=[], action="append", type="str", + help="Additional argument for qpid-receive") +op.add_option("--no-timestamp", dest="timestamp", default=True, + action="store_false", help="don't add a timestamp, no latency results") +op.add_option("--sequence", dest="sequence", default=False, + action="store_true", help="add a sequence number to each message") +op.add_option("--connection-options", type="str", + help="Connection options for senders & receivers") +op.add_option("--durable", default=False, action="store_true", + help="Use durable queues and messages") +op.add_option("-t", "--timeout", default=1.0, type="float", metavar="SECONDS", + help="Timeout for fetch operations (default %default)") +op.add_option("--save-received", default=False, action="store_true", + help="Save received message content to files <queuename>-receiver-<n>.msg") +op.add_option("--verbose", default=False, action="store_true", + help="Show commands executed") +op.add_option("--fill-drain", default=False, action="store_true", + help="First fill the queues, then drain them") +op.add_option("--qpid-send-path", default="", type="str", metavar="PATH", + help="path to qpid-send binary") +op.add_option("--qpid-receive-path", default="", type="str", metavar="PATH", + help="path to qpid-receive binary") + +single_quote_re = re.compile("'") +def posix_quote(string): + """ Quote a string for use as an argument in a posix shell""" + return "'" + single_quote_re.sub("\\'", string) + "'"; + +def ssh_command(host, command): + """ Convert command into an ssh command on host with quoting""" + return ["ssh", host] + [posix_quote(arg) for arg in command] + +class Clients: + def __init__(self): self.clients=[] + + def add(self, client): + self.clients.append(client) + return client + + def kill(self): + for c in self.clients: + try: c.kill() + except: pass + +class PopenCommand(Popen): + """Like Popen but you can query for the command""" + def __init__(self, command, *args, **kwargs): + self.command = command + Popen.__init__(self, command, *args, **kwargs) + +clients = Clients() + +def start_receive(queue, index, opts, ready_queue, broker, host): + address_opts=opts.receive_option + if opts.durable: address_opts += ["node:{durable:true}"] + address="%s;{%s}"%(queue,",".join(address_opts)) + msg_total=opts.senders*opts.messages + messages = msg_total/opts.receivers; + if (index < msg_total%opts.receivers): messages += 1 + if (messages == 0): return None + command = [os.path.join(opts.qpid_receive_path, "qpid-receive"), + "-b", broker, + "-a", address, + "-m", str(messages), + "--forever", + "--print-content=no", + "--receive-rate", str(opts.receive_rate), + "--report-total", + "--ack-frequency", str(opts.ack_frequency), + "--ready-address", "%s;{create:always}"%ready_queue, + "--report-header=no", + "--tx=%s" % opts.tx + ] + if opts.save_received: + command += ["--save-content=%s-receiver-%s.msg"%(queue,index)] + command += opts.receive_arg + if opts.connection_options: + command += ["--connection-options",opts.connection_options] + if host: command = ssh_command(host, command) + if opts.verbose: print "Receiver: ", command + return clients.add(PopenCommand(command, stdout=PIPE, stderr=PIPE)) + +def start_send(queue, opts, broker, host): + address="%s;{%s}"%(queue,",".join(opts.send_option + ["create:always"])) + command = [os.path.join(opts.qpid_send_path, "qpid-send"), + "-b", broker, + "-a", address, + "--messages", str(opts.messages), + "--content-size", str(opts.content_size), + "--send-rate", str(opts.send_rate), + "--report-total", + "--report-header=no", + "--timestamp=%s"%(opts.timestamp and "yes" or "no"), + "--sequence=%s"%(opts.sequence and "yes" or "no"), + "--durable=%d" % opts.durable, + "--tx=%s" % opts.tx + ] + command += opts.send_arg + if opts.connection_options: + command += ["--connection-options",opts.connection_options] + if host: command = ssh_command(host, command) + if opts.verbose: print "Sender: ", command + return clients.add(PopenCommand(command, stdout=PIPE, stderr=PIPE)) + +def error_msg(out, err): + return ("\n[stdout]\n%s\n[stderr]\n%s[end]"%(out, err)) + +def first_line(p): + out,err=p.communicate() + if p.returncode != 0: + raise Exception("Process exit %d: %s"%(p.returncode, error_msg(out,err))) + return out.split("\n")[0] + +def connect(broker, opts): + if opts.connection_options: + copts = dict([kv.strip().split(":") for kv in opts.connection_options.strip("{}").split(",")]) + else: + copts = {} + return qm.Connection.establish(broker, **copts) + +def drain(queue, session, opts): + """ + Drain a queue to make sure it is empty. Throw away the messages. + """ + if opts.verbose: print "Draining", queue + r = session.receiver(queue, capacity=1000) + n = 0 + try: + while True: + # FIXME aconway 2014-11-21: activemq broker does not respect the drain flag + # so fetch on an empty queue will hang forever, use get with timeout instead. + # r.fetch(timeout=0) + m = qm.Message() + r.get(timeout=opts.timeout) + n += 1 + if n % 500 == 0: r.session.acknowledge() + r.session.acknowledge() + except qm.Empty: + pass + r.close() + if opts.verbose: print "Drained", queue, n + +def clear_queues(queues, brokers, opts): + c = connect(brokers[0], opts) + for q in queues: + s = c.session() + need_drain = False + try: + s.sender("%s;{delete:always}"%(q)).close() + if opts.verbose: print "Deleted", q + except qm.NotFound: + s = c.session() + except qm.AddressError: + need_drain = True # AMQP 1.0 does not support delete, drain instead. + s = c.session() + address_opts = ["create:always"] + if opts.durable: address_opts += ["node:{durable:true}"] + address = "%s;{%s}"%(q, ",".join(opts.create_option + address_opts)) + if opts.verbose: print "Declaring", address + s.sender(address) + if need_drain: drain(q, s, opts) + c.close() + +def print_header(timestamp): + if timestamp: latency_header="\tl-min\tl-max\tl-avg\ttotal-tp" + else: latency_header="" + print "send-tp\trecv-tp%s"%latency_header + +def parse(parser, lines): # Parse sender/receiver output + return [map(lambda p: p[0](p[1]), zip(parser,line.split())) for line in lines] + +def parse_senders(senders): + return parse([int],[first_line(p) for p in senders]) + +def parse_receivers(receivers): + return parse([int,float,float,float],[first_line(p) for p in receivers if p]) + +def print_data(send_stats, recv_stats, total_tp): + for send,recv in map(None, send_stats, recv_stats): + line="" + if send: line += "%d"%send[0] + if recv: + line += "\t%d"%recv[0] + if len(recv) == 4: line += "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:]) + if total_tp is not None: + line += "\t%d"%total_tp + total_tp = None + print line + +def print_summary(send_stats, recv_stats, total_tp): + def avg(s): sum(s) / len(s) + send_tp = sum([l[0] for l in send_stats]) + recv_tp = sum([l[0] for l in recv_stats]) + summary = "%d\t%d"%(send_tp, recv_tp) + if recv_stats and len(recv_stats[0]) == 4: + l_min = sum(l[1] for l in recv_stats)/len(recv_stats) + l_max = sum(l[2] for l in recv_stats)/len(recv_stats) + l_avg = sum(l[3] for l in recv_stats)/len(recv_stats) + summary += "\t%.2f\t%.2f\t%.2f"%(l_min, l_max, l_avg) + summary += "\t%d"%total_tp + print summary + + +class ReadyReceiver: + """A receiver for ready messages""" + def __init__(self, queue, broker, opts): + self.connection = connect(broker, opts) + self.receiver = self.connection.session().receiver(queue) + self.receiver.session.sync() + self.timeout=opts.timeout + + def wait(self, receivers): + try: + for i in receivers: self.receiver.fetch(self.timeout) + self.receiver.session.acknowledge() + self.connection.close() + except qm.Empty: + for r in receivers: + if (r.poll() is not None): + out,err=r.communicate() + raise Exception("Receiver error: %s\n%s" % + (" ".join(r.command), error_msg(out,err))) + raise Exception("Timed out waiting for receivers to be ready") + +def flatten(l): + return sum(map(lambda s: re.split(re.compile("\s*,\s*|\s+"), s), l), []) + +class RoundRobin: + def __init__(self,items): + self.items = items + self.index = 0 + + def next(self): + if not self.items: return None + ret = self.items[self.index] + self.index = (self.index+1)%len(self.items) + return ret + +def main(): + opts, args = op.parse_args() + opts.client_host = flatten(opts.client_host) + if not opts.broker: + if opts.client_host: + raise Exception("--broker must be specified if --client_host is.") + opts.broker = ["127.0.0.1"] # Deafult to local broker + opts.broker = flatten(opts.broker) + brokers = RoundRobin(opts.broker) + client_hosts = RoundRobin(opts.client_host) + send_out = "" + receive_out = "" + ready_queue="%s-ready"%(opts.queue_name) + queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)] + try: + for i in xrange(opts.repeat): + clear_queues(queues+[ready_queue], opts.broker, opts) + ready_receiver = ReadyReceiver(ready_queue, opts.broker[0], opts) + + def start_receivers(): + return [ start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next()) + for q in queues for j in xrange(opts.receivers) ] + + + def start_senders(): + return [ start_send(q, opts,brokers.next(), client_hosts.next()) + for q in queues for j in xrange(opts.senders) ] + + if opts.report_header and i == 0: print_header(opts.timestamp) + + if opts.fill_drain: + # First fill the queues, then drain them + start = time.time() + senders = start_senders() + for p in senders: p.wait() + receivers = start_receivers() + for p in receivers: p.wait() + else: + # Run senders and receivers in parallel + receivers = start_receivers() + ready_receiver.wait(filter(None, receivers)) # Wait for receivers ready + start = time.time() + senders = start_senders() + for p in senders + receivers: p.wait() + + total_sent = opts.queues * opts.senders * opts.messages + total_tp = total_sent / (time.time()-start) + send_stats=parse_senders(senders) + recv_stats=parse_receivers(receivers) + if opts.summarize: print_summary(send_stats, recv_stats, total_tp) + else: print_data(send_stats, recv_stats, total_tp) + finally: clients.kill() # No strays + +if __name__ == "__main__": main() + |