summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-09-06 21:46:05 +0000
committerAlan Conway <aconway@apache.org>2011-09-06 21:46:05 +0000
commitc2ca77fd1369082998b41a3ea926dc9c87819836 (patch)
tree2690d12887c44e49ee2d67b33ab3979f075821be
parentc3ac08ec76c3882e42993ab5cf76e2360aba54d3 (diff)
downloadqpid-python-c2ca77fd1369082998b41a3ea926dc9c87819836.tar.gz
QPID-2920: minor improvements to qpid-cpp-benchmark
Re-arranged queue creation/deletion to avoid wiring races. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-1@1165877 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xqpid/cpp/src/tests/qpid-cpp-benchmark29
1 files changed, 20 insertions, 9 deletions
diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark
index 7ac100f683..6ecff0b52e 100755
--- a/qpid/cpp/src/tests/qpid-cpp-benchmark
+++ b/qpid/cpp/src/tests/qpid-cpp-benchmark
@@ -92,7 +92,7 @@ class Clients:
clients = Clients()
def start_receive(queue, index, opts, ready_queue, broker, host):
- address_opts=["create:receiver"] + opts.receive_option
+ address_opts=["create:always"] + opts.receive_option
if opts.durable: address_opts += ["node:{durable:true}"]
address="%s;{%s}"%(queue,",".join(address_opts))
msg_total=opts.senders*opts.messages
@@ -108,7 +108,7 @@ def start_receive(queue, index, opts, ready_queue, broker, host):
"--receive-rate", str(opts.receive_rate),
"--report-total",
"--ack-frequency", str(opts.ack_frequency),
- "--ready-address", ready_queue,
+ "--ready-address", "%s;{create:always}"%ready_queue,
"--report-header=no"
]
command += opts.receive_arg
@@ -118,7 +118,7 @@ def start_receive(queue, index, opts, ready_queue, broker, host):
return clients.add(Popen(command, stdout=PIPE))
def start_send(queue, opts, broker, host):
- address="%s;{%s}"%(queue,",".join(opts.send_option + ["create:receiver"]))
+ address="%s;{%s}"%(queue,",".join(opts.send_option + ["create:always"]))
command = ["qpid-send",
"-b", broker,
"-a", address,
@@ -138,23 +138,33 @@ def start_send(queue, opts, broker, host):
if host: command = ssh_command(host, command)
return clients.add(Popen(command, stdout=PIPE))
+def error_msg(out, err): return ("\n".join(filter(None, [out, err]))).strip()
+
def first_line(p):
out,err=p.communicate()
- if p.returncode != 0: raise Exception("Process failed: %s"%(out.strip()))
+ if p.returncode != 0: raise Exception("Process failed: %s"%error_msg(out,err))
return out.split("\n")[0]
def delete_queues(queues, broker):
c = qpid.messaging.Connection(broker)
c.open()
+ s = c.session()
for q in queues:
try:
- s = c.session()
snd = s.sender("%s;{delete:always}"%(q))
snd.close()
- s.sync()
except qpid.messaging.exceptions.NotFound: pass # Ignore "no such queue"
+ s.sync()
c.close()
+def create_queues(queues, broker):
+ c = qpid.messaging.Connection(broker)
+ c.open()
+ s = c.session()
+ for q in queues:
+ s.sender("%s;{create:always}"%q)
+ s.sync()
+
def print_header(timestamp):
if timestamp: latency_header="\tl-min\tl-max\tl-avg"
else: latency_header=""
@@ -196,7 +206,6 @@ def print_summary(send_stats, recv_stats):
class ReadyReceiver:
"""A receiver for ready messages"""
def __init__(self, queue, broker):
- delete_queues([queue], broker)
self.connection = qpid.messaging.Connection(broker)
self.connection.open()
self.receiver = self.connection.session().receiver(
@@ -212,7 +221,7 @@ class ReadyReceiver:
for r in receivers:
if (r.poll() is not None):
out,err=r.communicate()
- raise Exception("Receiver error: %s"%(out))
+ raise Exception("Receiver error: %s"%error_msg(out,err))
raise Exception("Timed out waiting for receivers to be ready")
def flatten(l):
@@ -243,8 +252,10 @@ def main():
try:
for i in xrange(opts.repeat):
delete_queues(queues, opts.broker[0])
+ time.sleep(.1) # FIXME aconway 2011-03-18: new cluster async wiring
+ create_queues(queues, opts.broker[0])
+ time.sleep(.1) # FIXME aconway 2011-03-18: new cluster async wiring
ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
- time.sleep(.1) # FIXME aconway 2011-03-16: new cluster async wiring
receivers = [start_receive(q, j, opts, ready_queue,
brokers.next(), client_hosts.next())
for q in queues for j in xrange(opts.receivers)]