diff options
author | Alan Conway <aconway@apache.org> | 2011-03-14 14:33:44 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-03-14 14:33:44 +0000 |
commit | 44315b319809b3d0b51a8843ad349913050d5e66 (patch) | |
tree | 825d2bad53fb8d76f48e447db2ad5684c1409003 /cpp/src/tests/qpid-cpp-benchmark | |
parent | 41d5943c1c1f888baba4d8641d6b548dddf3cf3d (diff) | |
download | qpid-python-44315b319809b3d0b51a8843ad349913050d5e66.tar.gz |
NO-JIRA: Minor improvements to test scripts qpid-cluster-benchmark and qpid-cpp-benchmark
- qpid-cluster-benchmark: added command line options.
- qpid-cpp-benchmark: clean up error handling, fixed a race condition.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1081396 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/qpid-cpp-benchmark')
-rwxr-xr-x | cpp/src/tests/qpid-cpp-benchmark | 68 |
1 files changed, 44 insertions, 24 deletions
diff --git a/cpp/src/tests/qpid-cpp-benchmark b/cpp/src/tests/qpid-cpp-benchmark index 1f77226b4d..6138108558 100755 --- a/cpp/src/tests/qpid-cpp-benchmark +++ b/cpp/src/tests/qpid-cpp-benchmark @@ -77,6 +77,20 @@ def ssh_command(host, command): """Convert command into an ssh command on host with quoting""" return ["ssh", host] + [posix_quote(arg) for arg in command] +class Clients: + def __init__(self): self.clients=[] + + def add(self, client): + self.clients.append(client) + return client + + def kill(self): + for c in self.clients: + try: c.kill() + except: pass + +clients = Clients() + def start_receive(queue, index, opts, ready_queue, broker, host): address_opts=["create:receiver"] + opts.receive_option if opts.durable: address_opts += ["node:{durable:true}"] @@ -101,7 +115,7 @@ def start_receive(queue, index, opts, ready_queue, broker, host): if opts.connection_options: command += ["--connection-options",opts.connection_options] if host: command = ssh_command(host, command) - return Popen(command, stdout=PIPE) + return clients.add(Popen(command, stdout=PIPE)) def start_send(queue, opts, broker, host): address="%s;{%s}"%(queue,",".join(opts.send_option)) @@ -122,7 +136,7 @@ def start_send(queue, opts, broker, host): if opts.connection_options: command += ["--connection-options",opts.connection_options] if host: command = ssh_command(host, command) - return Popen(command, stdout=PIPE) + return clients.add(Popen(command, stdout=PIPE)) def first_line(p): out,err=p.communicate() @@ -133,7 +147,11 @@ def delete_queues(queues, broker): c = qpid.messaging.Connection(broker) c.open() for q in queues: - try: s = c.session().sender("%s;{delete:always}"%(q)) + try: + s = c.session() + snd = s.sender("%s;{delete:always}"%(q)) + snd.close() + s.sync() except qpid.messaging.exceptions.NotFound: pass # Ignore "no such queue" c.close() @@ -145,7 +163,6 @@ def print_header(timestamp): def parse(parser, lines): # Parse sender/receiver output for l in lines: fn_val = zip(parser, l) - return [map(lambda p: p[0](p[1]), zip(parser,line.split())) for line in lines] def parse_senders(senders): @@ -156,11 +173,12 @@ def parse_receivers(receivers): def print_data(send_stats, recv_stats): for send,recv in map(None, send_stats, recv_stats): - if send: print send[0], + line="" + if send: line += "%d"%send[0] if recv: - print "\t\t%d"%recv[0], - if len(recv) == 4: print "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:]), - print + line += "\t\t%d"%recv[0] + if len(recv) == 4: line += "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:]) + print line def print_summary(send_stats, recv_stats): def avg(s): sum(s) / len(s) @@ -184,11 +202,11 @@ class ReadyReceiver: self.receiver = self.connection.session().receiver( "%s;{create:receiver,delete:receiver,node:{durable:false}}"%(queue)) self.receiver.session.sync() - self.timeout=2 + self.timeout=10 def wait(self, receivers): try: - for i in xrange(len(receivers)): self.receiver.fetch(self.timeout) + for i in receivers: self.receiver.fetch(self.timeout) self.connection.close() except qpid.messaging.Empty: for r in receivers: @@ -221,20 +239,22 @@ def main(): receive_out = "" ready_queue="%s-ready"%(opts.queue_name) queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)] - for i in xrange(opts.repeat): - delete_queues(queues, opts.broker[0]) - ready_receiver = ReadyReceiver(ready_queue, opts.broker[0]) - receivers = [start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next()) - for q in queues for j in xrange(opts.receivers)] - ready_receiver.wait(filter(None, receivers)) # Wait for receivers to be ready. - senders = [start_send(q, opts,brokers.next(), client_hosts.next()) - for q in queues for j in xrange(opts.senders)] - if opts.report_header and i == 0: print_header(opts.timestamp) - send_stats=parse_senders(senders) - recv_stats=parse_receivers(receivers) - if opts.summarize: print_summary(send_stats, recv_stats) - else: print_data(send_stats, recv_stats) - delete_queues(queues, opts.broker[0]) + try: + for i in xrange(opts.repeat): + delete_queues(queues, opts.broker[0]) + ready_receiver = ReadyReceiver(ready_queue, opts.broker[0]) + receivers = [start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next()) + for q in queues for j in xrange(opts.receivers)] + ready_receiver.wait(filter(None, receivers)) # Wait for receivers to be ready. + senders = [start_send(q, opts,brokers.next(), client_hosts.next()) + for q in queues for j in xrange(opts.senders)] + if opts.report_header and i == 0: print_header(opts.timestamp) + send_stats=parse_senders(senders) + recv_stats=parse_receivers(receivers) + if opts.summarize: print_summary(send_stats, recv_stats) + else: print_data(send_stats, recv_stats) + delete_queues(queues, opts.broker[0]) + finally: clients.kill() # No strays if __name__ == "__main__": main() |