summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-07-22 22:05:47 +0000
committerAlan Conway <aconway@apache.org>2011-07-22 22:05:47 +0000
commit4bdcae965d344ca21210303b47d941e13a9a4e3c (patch)
tree98e2337c1930a5610b0c04a45dc429a05a0ca8cd
parent908e792680b6b3a0c0b366ddc2db4eab4f5e80c9 (diff)
downloadqpid-python-4bdcae965d344ca21210303b47d941e13a9a4e3c.tar.gz
QPID-2920: Improvements to qpid-cpp-benchmark.
- fixed error message. - add necessary waiting to deal with async wiring in the new cluster. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920@1149747 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xqpid/cpp/src/tests/qpid-cpp-benchmark46
1 files changed, 27 insertions, 19 deletions
diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark
index 6a1c187214..fc047e935f 100755
--- a/qpid/cpp/src/tests/qpid-cpp-benchmark
+++ b/qpid/cpp/src/tests/qpid-cpp-benchmark
@@ -92,7 +92,7 @@ class Clients:
clients = Clients()
def start_receive(queue, index, opts, ready_queue, broker, host):
- address_opts=["create:always"] + opts.receive_option
+ address_opts=opts.receive_option
if opts.durable: address_opts += ["node:{durable:true}"]
address="%s;{%s}"%(queue,",".join(address_opts))
msg_total=opts.senders*opts.messages
@@ -115,7 +115,7 @@ def start_receive(queue, index, opts, ready_queue, broker, host):
if opts.connection_options:
command += ["--connection-options",opts.connection_options]
if host: command = ssh_command(host, command)
- return clients.add(Popen(command, stdout=PIPE))
+ return clients.add(Popen(command, stdout=PIPE, stderr=PIPE))
def start_send(queue, opts, broker, host):
address="%s;{%s}"%(queue,",".join(opts.send_option + ["create:always"]))
@@ -136,34 +136,45 @@ def start_send(queue, opts, broker, host):
if opts.connection_options:
command += ["--connection-options",opts.connection_options]
if host: command = ssh_command(host, command)
- return clients.add(Popen(command, stdout=PIPE))
+ return clients.add(Popen(command, stdout=PIPE, stderr=PIPE))
-def error_msg(out, err): return ("\n".join(filter(None, [out, err]))).strip()
+def error_msg(out, err):
+ return ("\n[stdout]\n%s\n[stderr]\n%s[end]"%(out, err))
def first_line(p):
out,err=p.communicate()
- if p.returncode != 0: raise Exception("Process failed: %s"%error_msg(out,err))
+ if p.returncode != 0:
+ raise Exception("Process exit %d: %s"%(p.returncode, error_msg(out,err)))
return out.split("\n")[0]
-def delete_queues(queues, broker):
+def queue_exists(queue,broker):
c = qpid.messaging.Connection(broker)
c.open()
s = c.session()
- for q in queues:
+ try:
try:
- snd = s.sender("%s;{delete:always}"%(q))
- snd.close()
- except qpid.messaging.exceptions.NotFound: pass # Ignore "no such queue"
- s.sync()
- c.close()
+ s.sender(queue)
+ return True
+ except qpid.messaging.exceptions.NotFound:
+ return False
+ finally: c.close()
-def create_queues(queues, broker):
- c = qpid.messaging.Connection(broker)
+def recreate_queues(queues, brokers):
+ c = qpid.messaging.Connection(brokers[0])
c.open()
s = c.session()
for q in queues:
+ try: s.sender("%s;{delete:always}"%(q)).close()
+ except qpid.messaging.exceptions.NotFound: pass
+ # FIXME aconway 2011-05-04: async wiring, wait for changes to propagate.
+ for b in brokers:
+ while queue_exists(q,b): time.sleep(0.001);
+ for q in queues:
s.sender("%s;{create:always}"%q)
- s.sync()
+ # FIXME aconway 2011-05-04: async wiring, wait for changes to propagate.
+ for b in brokers:
+ while not queue_exists(q,b): time.sleep(0.001);
+ c.close()
def print_header(timestamp):
if timestamp: latency_header="\tl-min\tl-max\tl-avg"
@@ -250,10 +261,7 @@ def main():
queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)]
try:
for i in xrange(opts.repeat):
- delete_queues(queues, opts.broker[0])
- time.sleep(.1) # FIXME aconway 2011-03-18: new cluster async wiring
- create_queues(queues, opts.broker[0])
- time.sleep(.1) # FIXME aconway 2011-03-18: new cluster async wiring
+ recreate_queues(queues, opts.broker)
ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
receivers = [start_receive(q, j, opts, ready_queue,
brokers.next(), client_hosts.next())