From 2d1b6da0b7e613642a07d9ea6823f269ed12cae5 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Tue, 6 Sep 2011 21:46:18 +0000 Subject: QPID-2920: Improvements to qpid-cpp-benchmark. - fixed error message. - add necessary waiting to deal with async wiring in the new cluster. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-1@1165879 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/qpid-cpp-benchmark | 46 ++++++++++++++++++++--------------- 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark index 6ecff0b52e..74bf2df22d 100755 --- a/qpid/cpp/src/tests/qpid-cpp-benchmark +++ b/qpid/cpp/src/tests/qpid-cpp-benchmark @@ -92,7 +92,7 @@ class Clients: clients = Clients() def start_receive(queue, index, opts, ready_queue, broker, host): - address_opts=["create:always"] + opts.receive_option + address_opts=opts.receive_option if opts.durable: address_opts += ["node:{durable:true}"] address="%s;{%s}"%(queue,",".join(address_opts)) msg_total=opts.senders*opts.messages @@ -115,7 +115,7 @@ def start_receive(queue, index, opts, ready_queue, broker, host): if opts.connection_options: command += ["--connection-options",opts.connection_options] if host: command = ssh_command(host, command) - return clients.add(Popen(command, stdout=PIPE)) + return clients.add(Popen(command, stdout=PIPE, stderr=PIPE)) def start_send(queue, opts, broker, host): address="%s;{%s}"%(queue,",".join(opts.send_option + ["create:always"])) @@ -136,34 +136,45 @@ def start_send(queue, opts, broker, host): if opts.connection_options: command += ["--connection-options",opts.connection_options] if host: command = ssh_command(host, command) - return clients.add(Popen(command, stdout=PIPE)) + return clients.add(Popen(command, stdout=PIPE, stderr=PIPE)) -def error_msg(out, err): return ("\n".join(filter(None, [out, err]))).strip() +def error_msg(out, err): + return ("\n[stdout]\n%s\n[stderr]\n%s[end]"%(out, err)) def first_line(p): out,err=p.communicate() - if p.returncode != 0: raise Exception("Process failed: %s"%error_msg(out,err)) + if p.returncode != 0: + raise Exception("Process exit %d: %s"%(p.returncode, error_msg(out,err))) return out.split("\n")[0] -def delete_queues(queues, broker): +def queue_exists(queue,broker): c = qpid.messaging.Connection(broker) c.open() s = c.session() - for q in queues: + try: try: - snd = s.sender("%s;{delete:always}"%(q)) - snd.close() - except qpid.messaging.exceptions.NotFound: pass # Ignore "no such queue" - s.sync() - c.close() + s.sender(queue) + return True + except qpid.messaging.exceptions.NotFound: + return False + finally: c.close() -def create_queues(queues, broker): - c = qpid.messaging.Connection(broker) +def recreate_queues(queues, brokers): + c = qpid.messaging.Connection(brokers[0]) c.open() s = c.session() + for q in queues: + try: s.sender("%s;{delete:always}"%(q)).close() + except qpid.messaging.exceptions.NotFound: pass + # FIXME aconway 2011-05-04: async wiring, wait for changes to propagate. + for b in brokers: + while queue_exists(q,b): time.sleep(0.001); for q in queues: s.sender("%s;{create:always}"%q) - s.sync() + # FIXME aconway 2011-05-04: async wiring, wait for changes to propagate. + for b in brokers: + while not queue_exists(q,b): time.sleep(0.001); + c.close() def print_header(timestamp): if timestamp: latency_header="\tl-min\tl-max\tl-avg" @@ -251,10 +262,7 @@ def main(): queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)] try: for i in xrange(opts.repeat): - delete_queues(queues, opts.broker[0]) - time.sleep(.1) # FIXME aconway 2011-03-18: new cluster async wiring - create_queues(queues, opts.broker[0]) - time.sleep(.1) # FIXME aconway 2011-03-18: new cluster async wiring + recreate_queues(queues, opts.broker) ready_receiver = ReadyReceiver(ready_queue, opts.broker[0]) receivers = [start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next()) -- cgit v1.2.1