summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-09-06 21:46:18 +0000
committerAlan Conway <aconway@apache.org>2011-09-06 21:46:18 +0000
commit2d1b6da0b7e613642a07d9ea6823f269ed12cae5 (patch)
tree17362f775e6bba8fe9b334dec935c4fb3dd16bca
parent1385a81069a24cfd53d5f0fed14c4b7c12fa5102 (diff)
downloadqpid-python-2d1b6da0b7e613642a07d9ea6823f269ed12cae5.tar.gz
QPID-2920: Improvements to qpid-cpp-benchmark.
- fixed error message. - add necessary waiting to deal with async wiring in the new cluster. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-1@1165879 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xqpid/cpp/src/tests/qpid-cpp-benchmark46
1 files changed, 27 insertions, 19 deletions
diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark
index 6ecff0b52e..74bf2df22d 100755
--- a/qpid/cpp/src/tests/qpid-cpp-benchmark
+++ b/qpid/cpp/src/tests/qpid-cpp-benchmark
@@ -92,7 +92,7 @@ class Clients:
clients = Clients()
def start_receive(queue, index, opts, ready_queue, broker, host):
- address_opts=["create:always"] + opts.receive_option
+ address_opts=opts.receive_option
if opts.durable: address_opts += ["node:{durable:true}"]
address="%s;{%s}"%(queue,",".join(address_opts))
msg_total=opts.senders*opts.messages
@@ -115,7 +115,7 @@ def start_receive(queue, index, opts, ready_queue, broker, host):
if opts.connection_options:
command += ["--connection-options",opts.connection_options]
if host: command = ssh_command(host, command)
- return clients.add(Popen(command, stdout=PIPE))
+ return clients.add(Popen(command, stdout=PIPE, stderr=PIPE))
def start_send(queue, opts, broker, host):
address="%s;{%s}"%(queue,",".join(opts.send_option + ["create:always"]))
@@ -136,34 +136,45 @@ def start_send(queue, opts, broker, host):
if opts.connection_options:
command += ["--connection-options",opts.connection_options]
if host: command = ssh_command(host, command)
- return clients.add(Popen(command, stdout=PIPE))
+ return clients.add(Popen(command, stdout=PIPE, stderr=PIPE))
-def error_msg(out, err): return ("\n".join(filter(None, [out, err]))).strip()
+def error_msg(out, err):
+ return ("\n[stdout]\n%s\n[stderr]\n%s[end]"%(out, err))
def first_line(p):
out,err=p.communicate()
- if p.returncode != 0: raise Exception("Process failed: %s"%error_msg(out,err))
+ if p.returncode != 0:
+ raise Exception("Process exit %d: %s"%(p.returncode, error_msg(out,err)))
return out.split("\n")[0]
-def delete_queues(queues, broker):
+def queue_exists(queue,broker):
c = qpid.messaging.Connection(broker)
c.open()
s = c.session()
- for q in queues:
+ try:
try:
- snd = s.sender("%s;{delete:always}"%(q))
- snd.close()
- except qpid.messaging.exceptions.NotFound: pass # Ignore "no such queue"
- s.sync()
- c.close()
+ s.sender(queue)
+ return True
+ except qpid.messaging.exceptions.NotFound:
+ return False
+ finally: c.close()
-def create_queues(queues, broker):
- c = qpid.messaging.Connection(broker)
+def recreate_queues(queues, brokers):
+ c = qpid.messaging.Connection(brokers[0])
c.open()
s = c.session()
for q in queues:
+ try: s.sender("%s;{delete:always}"%(q)).close()
+ except qpid.messaging.exceptions.NotFound: pass
+ # FIXME aconway 2011-05-04: async wiring, wait for changes to propagate.
+ for b in brokers:
+ while queue_exists(q,b): time.sleep(0.001);
+ for q in queues:
s.sender("%s;{create:always}"%q)
- s.sync()
+ # FIXME aconway 2011-05-04: async wiring, wait for changes to propagate.
+ for b in brokers:
+ while not queue_exists(q,b): time.sleep(0.001);
+ c.close()
def print_header(timestamp):
if timestamp: latency_header="\tl-min\tl-max\tl-avg"
@@ -251,10 +262,7 @@ def main():
queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)]
try:
for i in xrange(opts.repeat):
- delete_queues(queues, opts.broker[0])
- time.sleep(.1) # FIXME aconway 2011-03-18: new cluster async wiring
- create_queues(queues, opts.broker[0])
- time.sleep(.1) # FIXME aconway 2011-03-18: new cluster async wiring
+ recreate_queues(queues, opts.broker)
ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
receivers = [start_receive(q, j, opts, ready_queue,
brokers.next(), client_hosts.next())