diff options
author | Alan Conway <aconway@apache.org> | 2011-07-22 22:05:47 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-07-22 22:05:47 +0000 |
commit | 4bdcae965d344ca21210303b47d941e13a9a4e3c (patch) | |
tree | 98e2337c1930a5610b0c04a45dc429a05a0ca8cd | |
parent | 908e792680b6b3a0c0b366ddc2db4eab4f5e80c9 (diff) | |
download | qpid-python-4bdcae965d344ca21210303b47d941e13a9a4e3c.tar.gz |
QPID-2920: Improvements to qpid-cpp-benchmark.
- fixed error message.
- add necessary waiting to deal with async wiring in the new cluster.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920@1149747 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-x | qpid/cpp/src/tests/qpid-cpp-benchmark | 46 |
1 files changed, 27 insertions, 19 deletions
diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark index 6a1c187214..fc047e935f 100755 --- a/qpid/cpp/src/tests/qpid-cpp-benchmark +++ b/qpid/cpp/src/tests/qpid-cpp-benchmark @@ -92,7 +92,7 @@ class Clients: clients = Clients() def start_receive(queue, index, opts, ready_queue, broker, host): - address_opts=["create:always"] + opts.receive_option + address_opts=opts.receive_option if opts.durable: address_opts += ["node:{durable:true}"] address="%s;{%s}"%(queue,",".join(address_opts)) msg_total=opts.senders*opts.messages @@ -115,7 +115,7 @@ def start_receive(queue, index, opts, ready_queue, broker, host): if opts.connection_options: command += ["--connection-options",opts.connection_options] if host: command = ssh_command(host, command) - return clients.add(Popen(command, stdout=PIPE)) + return clients.add(Popen(command, stdout=PIPE, stderr=PIPE)) def start_send(queue, opts, broker, host): address="%s;{%s}"%(queue,",".join(opts.send_option + ["create:always"])) @@ -136,34 +136,45 @@ def start_send(queue, opts, broker, host): if opts.connection_options: command += ["--connection-options",opts.connection_options] if host: command = ssh_command(host, command) - return clients.add(Popen(command, stdout=PIPE)) + return clients.add(Popen(command, stdout=PIPE, stderr=PIPE)) -def error_msg(out, err): return ("\n".join(filter(None, [out, err]))).strip() +def error_msg(out, err): + return ("\n[stdout]\n%s\n[stderr]\n%s[end]"%(out, err)) def first_line(p): out,err=p.communicate() - if p.returncode != 0: raise Exception("Process failed: %s"%error_msg(out,err)) + if p.returncode != 0: + raise Exception("Process exit %d: %s"%(p.returncode, error_msg(out,err))) return out.split("\n")[0] -def delete_queues(queues, broker): +def queue_exists(queue,broker): c = qpid.messaging.Connection(broker) c.open() s = c.session() - for q in queues: + try: try: - snd = s.sender("%s;{delete:always}"%(q)) - snd.close() - except qpid.messaging.exceptions.NotFound: pass # Ignore "no such queue" - s.sync() - c.close() + s.sender(queue) + return True + except qpid.messaging.exceptions.NotFound: + return False + finally: c.close() -def create_queues(queues, broker): - c = qpid.messaging.Connection(broker) +def recreate_queues(queues, brokers): + c = qpid.messaging.Connection(brokers[0]) c.open() s = c.session() for q in queues: + try: s.sender("%s;{delete:always}"%(q)).close() + except qpid.messaging.exceptions.NotFound: pass + # FIXME aconway 2011-05-04: async wiring, wait for changes to propagate. + for b in brokers: + while queue_exists(q,b): time.sleep(0.001); + for q in queues: s.sender("%s;{create:always}"%q) - s.sync() + # FIXME aconway 2011-05-04: async wiring, wait for changes to propagate. + for b in brokers: + while not queue_exists(q,b): time.sleep(0.001); + c.close() def print_header(timestamp): if timestamp: latency_header="\tl-min\tl-max\tl-avg" @@ -250,10 +261,7 @@ def main(): queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)] try: for i in xrange(opts.repeat): - delete_queues(queues, opts.broker[0]) - time.sleep(.1) # FIXME aconway 2011-03-18: new cluster async wiring - create_queues(queues, opts.broker[0]) - time.sleep(.1) # FIXME aconway 2011-03-18: new cluster async wiring + recreate_queues(queues, opts.broker) ready_receiver = ReadyReceiver(ready_queue, opts.broker[0]) receivers = [start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next()) |