diff options
author | Alan Conway <aconway@apache.org> | 2010-09-30 20:06:03 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-09-30 20:06:03 +0000 |
commit | b7323d923cfddd71e8efd5c16055239263fbef18 (patch) | |
tree | 97dc18c5296ac971ba2bb535bfed2719aa5db832 | |
parent | f3a5353aa51666bf890af31b16614bef9bb64205 (diff) | |
download | qpid-python-b7323d923cfddd71e8efd5c16055239263fbef18.tar.gz |
Extending qpid-cpp-benchmark for cluster testing
- multiple --broker args have senders/receivers connect to different cluster nodes.
- multiple --client-host args start clients on different hosts via ssh.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1003228 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-x | qpid/cpp/src/tests/qpid-cpp-benchmark | 112 | ||||
-rw-r--r-- | qpid/cpp/src/tests/qpid-receive.cpp | 4 |
2 files changed, 74 insertions, 42 deletions
diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark index 1831a541f5..85efff9a36 100755 --- a/qpid/cpp/src/tests/qpid-cpp-benchmark +++ b/qpid/cpp/src/tests/qpid-cpp-benchmark @@ -18,30 +18,32 @@ # under the License. # -import optparse, time, qpid.messaging +import optparse, time, qpid.messaging, re from threading import Thread from subprocess import Popen, PIPE, STDOUT op = optparse.OptionParser(usage="usage: %prog [options]", description="simple performance benchmarks") -op.add_option("-b", "--broker", default="127.0.0.1", - help="url of broker to connect to") +op.add_option("-b", "--broker", default=[], action="append", type="str", + help="url of broker(s) to connect to, round robin on multiple brokers") +op.add_option("-c", "--client-host", default=[], action="append", type="str", + help="host(s) to run clients on via ssh, round robin on mulple hosts") op.add_option("-q", "--queues", default=1, type="int", metavar="N", help="create N queues (default %default)") op.add_option("-s", "--senders", default=1, type="int", metavar="N", - help="start N senders per queue (default %default)") + help="start N senders per queue (default %default)") op.add_option("-r", "--receivers", default=1, type="int", metavar="N", - help="start N receivers per queue (default %default)") + help="start N receivers per queue (default %default)") op.add_option("-m", "--messages", default=100000, type="int", metavar="N", - help="send N messages per sender (default %default)") -op.add_option("--queue-name", default="benchmark", + help="send N messages per sender (default %default)") +op.add_option("--queue-name", default="benchmark", metavar="NAME", help="base name for queues (default %default)") op.add_option("--send-rate", default=0, metavar="R", help="send rate limited to R messages/second, 0 means no limit (default %default)") op.add_option("--content-size", default=1024, type="int", metavar="BYTES", help="message size in bytes (default %default)") op.add_option("--ack-frequency", default=0, metavar="N", type="int", - help="receiver ack's every N messages, 0 means unconfirmed (default %d)") + help="receiver ack's every N messages, 0 means unconfirmed (default %default)") op.add_option("--no-report-header", dest="report_header", default=True, action="store_false", help="don't print header on report") op.add_option("--repeat", default=1, metavar="N", help="repeat N times", type="int") @@ -52,35 +54,45 @@ op.add_option("--receive-option", default=[], action="append", type="str", op.add_option("--no-timestamp", dest="timestamp", default=True, action="store_false", help="don't add a timestamp, no latency results") -def start_receive(queue, opts, ready_queue): +single_quote_re = re.compile("'") + +def posix_quote(string): + """ Quote a string for use as an argument in a posix shell""" + return "'" + single_quote_re.sub("\\'", string) + "'"; + +def ssh_command(host, command): + """Convert command into an ssh command on host with quoting""" + return ["ssh", host] + [posix_quote(arg) for arg in command] + +def start_receive(queue, opts, ready_queue, broker, host): address="%s;{%s}"%(queue,",".join(["create:always"]+opts.receive_option)) - return Popen(["qpid-receive", - "-b", opts.broker, - "-a", address, - "--forever", - "--print-content=no", - "--report-total", - "--ack-frequency", str(opts.ack_frequency), - "--ready-address", ready_queue, - "--report-header=no", - ], - stdout=PIPE, stderr=STDOUT) - -def start_send(queue, opts): + command = ["qpid-receive", + "-b", broker, + "-a", address, + "--forever", + "--print-content=no", + "--report-total", + "--ack-frequency", str(opts.ack_frequency), + "--ready-address", ready_queue, + "--report-header=no"] + if host: command = ssh_command(host, command) + return Popen(command, stdout=PIPE, stderr=STDOUT) + +def start_send(queue, opts, broker, host): address="%s;{%s}"%(queue,",".join(opts.send_option)) - return Popen(["qpid-send", - "-b", opts.broker, - "-a", address, - "--messages", str(opts.messages), - "--send-eos", str(opts.receivers), - "--content-size", str(opts.content_size), - "--send-rate", str(opts.send_rate), - "--report-total", - "--report-header=no", - "--timestamp=%s"%(opts.timestamp and "yes" or "no"), - "--sequence=no", - ], - stdout=PIPE, stderr=STDOUT) + command = ["qpid-send", + "-b", broker, + "-a", address, + "--messages", str(opts.messages), + "--send-eos", str(opts.receivers), + "--content-size", str(opts.content_size), + "--send-rate", str(opts.send_rate), + "--report-total", + "--report-header=no", + "--timestamp=%s"%(opts.timestamp and "yes" or "no"), + "--sequence=no"] + if host: command = ssh_command(host, command) + return Popen(command, stdout=PIPE, stderr=STDOUT) def wait_for_output(p): out,err=p.communicate() @@ -125,21 +137,41 @@ class ReadyReceiver: if (r.poll()): raise "Receiver error: %s"%(wait_for_output(r)) raise "Timed out waiting for receivers to be ready" +def flatten(l): return sum(map(lambda s: s.split(","), l),[]) + +class RoundRobin: + def __init__(self,items): + self.items = items + self.index = 0 + + def next(self): + if not self.items: return None + ret = self.items[self.index] + self.index = (self.index+1)%len(self.items) + return ret + def main(): opts, args = op.parse_args() + if not opts.broker: opts.broker = ["127.0.0.1"] # Deafult to local broker + opts.broker = flatten(opts.broker) + opts.client_host = flatten(opts.client_host) + brokers = RoundRobin(opts.broker) + client_hosts = RoundRobin(opts.client_host) send_out = "" receive_out = "" ready_queue="%s-ready"%(opts.queue_name) queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)] for i in xrange(opts.repeat): - delete_queues(queues, opts.broker) - ready_receiver = ReadyReceiver(ready_queue, opts.broker) - receivers = [start_receive(q, opts, ready_queue) + + delete_queues(queues, opts.broker[0]) + ready_receiver = ReadyReceiver(ready_queue, opts.broker[0]) + receivers = [start_receive(q, opts, ready_queue, brokers.next(), client_hosts.next()) for q in queues for j in xrange(opts.receivers)] ready_receiver.wait(receivers) # Wait for receivers to be ready. - senders = [start_send(q, opts) for q in queues for j in xrange(opts.senders)] + senders = [start_send(q, opts,brokers.next(), client_hosts.next()) + for q in queues for j in xrange(opts.senders)] print_output(senders, receivers, opts.report_header and i == 0) - delete_queues(queues, opts.broker) + delete_queues(queues, opts.broker[0]) if __name__ == "__main__": main() diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp index a0394ccd21..c8bb58ac54 100644 --- a/qpid/cpp/src/tests/qpid-receive.cpp +++ b/qpid/cpp/src/tests/qpid-receive.cpp @@ -102,8 +102,8 @@ struct Options : public qpid::Options ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover") ("report-total", qpid::optValue(reportTotal), "Report total throughput and latency statistics") ("report-every", qpid::optValue(reportEvery,"N"), "Report throughput and latency statistics every N messages.") - ("report-header", qpid::optValue(reportHeader, "yes|no"), "Headers on report.") ("ready-address", qpid::optValue(readyAddress, "ADDRESS"), - "send a message to this address when ready to receive") + ("report-header", qpid::optValue(reportHeader, "yes|no"), "Headers on report.") + ("ready-address", qpid::optValue(readyAddress, "ADDRESS"), "send a message to this address when ready to receive") ("help", qpid::optValue(help), "print this usage statement"); add(log); } |