summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-09-30 20:06:03 +0000
committerAlan Conway <aconway@apache.org>2010-09-30 20:06:03 +0000
commitb7323d923cfddd71e8efd5c16055239263fbef18 (patch)
tree97dc18c5296ac971ba2bb535bfed2719aa5db832
parentf3a5353aa51666bf890af31b16614bef9bb64205 (diff)
downloadqpid-python-b7323d923cfddd71e8efd5c16055239263fbef18.tar.gz
Extending qpid-cpp-benchmark for cluster testing
- multiple --broker args have senders/receivers connect to different cluster nodes. - multiple --client-host args start clients on different hosts via ssh. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1003228 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xqpid/cpp/src/tests/qpid-cpp-benchmark112
-rw-r--r--qpid/cpp/src/tests/qpid-receive.cpp4
2 files changed, 74 insertions, 42 deletions
diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark
index 1831a541f5..85efff9a36 100755
--- a/qpid/cpp/src/tests/qpid-cpp-benchmark
+++ b/qpid/cpp/src/tests/qpid-cpp-benchmark
@@ -18,30 +18,32 @@
# under the License.
#
-import optparse, time, qpid.messaging
+import optparse, time, qpid.messaging, re
from threading import Thread
from subprocess import Popen, PIPE, STDOUT
op = optparse.OptionParser(usage="usage: %prog [options]",
description="simple performance benchmarks")
-op.add_option("-b", "--broker", default="127.0.0.1",
- help="url of broker to connect to")
+op.add_option("-b", "--broker", default=[], action="append", type="str",
+ help="url of broker(s) to connect to, round robin on multiple brokers")
+op.add_option("-c", "--client-host", default=[], action="append", type="str",
+ help="host(s) to run clients on via ssh, round robin on mulple hosts")
op.add_option("-q", "--queues", default=1, type="int", metavar="N",
help="create N queues (default %default)")
op.add_option("-s", "--senders", default=1, type="int", metavar="N",
- help="start N senders per queue (default %default)")
+ help="start N senders per queue (default %default)")
op.add_option("-r", "--receivers", default=1, type="int", metavar="N",
- help="start N receivers per queue (default %default)")
+ help="start N receivers per queue (default %default)")
op.add_option("-m", "--messages", default=100000, type="int", metavar="N",
- help="send N messages per sender (default %default)")
-op.add_option("--queue-name", default="benchmark",
+ help="send N messages per sender (default %default)")
+op.add_option("--queue-name", default="benchmark", metavar="NAME",
help="base name for queues (default %default)")
op.add_option("--send-rate", default=0, metavar="R",
help="send rate limited to R messages/second, 0 means no limit (default %default)")
op.add_option("--content-size", default=1024, type="int", metavar="BYTES",
help="message size in bytes (default %default)")
op.add_option("--ack-frequency", default=0, metavar="N", type="int",
- help="receiver ack's every N messages, 0 means unconfirmed (default %d)")
+ help="receiver ack's every N messages, 0 means unconfirmed (default %default)")
op.add_option("--no-report-header", dest="report_header", default=True,
action="store_false", help="don't print header on report")
op.add_option("--repeat", default=1, metavar="N", help="repeat N times", type="int")
@@ -52,35 +54,45 @@ op.add_option("--receive-option", default=[], action="append", type="str",
op.add_option("--no-timestamp", dest="timestamp", default=True,
action="store_false", help="don't add a timestamp, no latency results")
-def start_receive(queue, opts, ready_queue):
+single_quote_re = re.compile("'")
+
+def posix_quote(string):
+ """ Quote a string for use as an argument in a posix shell"""
+ return "'" + single_quote_re.sub("\\'", string) + "'";
+
+def ssh_command(host, command):
+ """Convert command into an ssh command on host with quoting"""
+ return ["ssh", host] + [posix_quote(arg) for arg in command]
+
+def start_receive(queue, opts, ready_queue, broker, host):
address="%s;{%s}"%(queue,",".join(["create:always"]+opts.receive_option))
- return Popen(["qpid-receive",
- "-b", opts.broker,
- "-a", address,
- "--forever",
- "--print-content=no",
- "--report-total",
- "--ack-frequency", str(opts.ack_frequency),
- "--ready-address", ready_queue,
- "--report-header=no",
- ],
- stdout=PIPE, stderr=STDOUT)
-
-def start_send(queue, opts):
+ command = ["qpid-receive",
+ "-b", broker,
+ "-a", address,
+ "--forever",
+ "--print-content=no",
+ "--report-total",
+ "--ack-frequency", str(opts.ack_frequency),
+ "--ready-address", ready_queue,
+ "--report-header=no"]
+ if host: command = ssh_command(host, command)
+ return Popen(command, stdout=PIPE, stderr=STDOUT)
+
+def start_send(queue, opts, broker, host):
address="%s;{%s}"%(queue,",".join(opts.send_option))
- return Popen(["qpid-send",
- "-b", opts.broker,
- "-a", address,
- "--messages", str(opts.messages),
- "--send-eos", str(opts.receivers),
- "--content-size", str(opts.content_size),
- "--send-rate", str(opts.send_rate),
- "--report-total",
- "--report-header=no",
- "--timestamp=%s"%(opts.timestamp and "yes" or "no"),
- "--sequence=no",
- ],
- stdout=PIPE, stderr=STDOUT)
+ command = ["qpid-send",
+ "-b", broker,
+ "-a", address,
+ "--messages", str(opts.messages),
+ "--send-eos", str(opts.receivers),
+ "--content-size", str(opts.content_size),
+ "--send-rate", str(opts.send_rate),
+ "--report-total",
+ "--report-header=no",
+ "--timestamp=%s"%(opts.timestamp and "yes" or "no"),
+ "--sequence=no"]
+ if host: command = ssh_command(host, command)
+ return Popen(command, stdout=PIPE, stderr=STDOUT)
def wait_for_output(p):
out,err=p.communicate()
@@ -125,21 +137,41 @@ class ReadyReceiver:
if (r.poll()): raise "Receiver error: %s"%(wait_for_output(r))
raise "Timed out waiting for receivers to be ready"
+def flatten(l): return sum(map(lambda s: s.split(","), l),[])
+
+class RoundRobin:
+ def __init__(self,items):
+ self.items = items
+ self.index = 0
+
+ def next(self):
+ if not self.items: return None
+ ret = self.items[self.index]
+ self.index = (self.index+1)%len(self.items)
+ return ret
+
def main():
opts, args = op.parse_args()
+ if not opts.broker: opts.broker = ["127.0.0.1"] # Deafult to local broker
+ opts.broker = flatten(opts.broker)
+ opts.client_host = flatten(opts.client_host)
+ brokers = RoundRobin(opts.broker)
+ client_hosts = RoundRobin(opts.client_host)
send_out = ""
receive_out = ""
ready_queue="%s-ready"%(opts.queue_name)
queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)]
for i in xrange(opts.repeat):
- delete_queues(queues, opts.broker)
- ready_receiver = ReadyReceiver(ready_queue, opts.broker)
- receivers = [start_receive(q, opts, ready_queue)
+
+ delete_queues(queues, opts.broker[0])
+ ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
+ receivers = [start_receive(q, opts, ready_queue, brokers.next(), client_hosts.next())
for q in queues for j in xrange(opts.receivers)]
ready_receiver.wait(receivers) # Wait for receivers to be ready.
- senders = [start_send(q, opts) for q in queues for j in xrange(opts.senders)]
+ senders = [start_send(q, opts,brokers.next(), client_hosts.next())
+ for q in queues for j in xrange(opts.senders)]
print_output(senders, receivers, opts.report_header and i == 0)
- delete_queues(queues, opts.broker)
+ delete_queues(queues, opts.broker[0])
if __name__ == "__main__": main()
diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp
index a0394ccd21..c8bb58ac54 100644
--- a/qpid/cpp/src/tests/qpid-receive.cpp
+++ b/qpid/cpp/src/tests/qpid-receive.cpp
@@ -102,8 +102,8 @@ struct Options : public qpid::Options
("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover")
("report-total", qpid::optValue(reportTotal), "Report total throughput and latency statistics")
("report-every", qpid::optValue(reportEvery,"N"), "Report throughput and latency statistics every N messages.")
- ("report-header", qpid::optValue(reportHeader, "yes|no"), "Headers on report.") ("ready-address", qpid::optValue(readyAddress, "ADDRESS"),
- "send a message to this address when ready to receive")
+ ("report-header", qpid::optValue(reportHeader, "yes|no"), "Headers on report.")
+ ("ready-address", qpid::optValue(readyAddress, "ADDRESS"), "send a message to this address when ready to receive")
("help", qpid::optValue(help), "print this usage statement");
add(log);
}