summaryrefslogtreecommitdiff
path: root/cpp/src/tests/qpid_cpp_benchmark
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/qpid_cpp_benchmark')
-rwxr-xr-xcpp/src/tests/qpid_cpp_benchmark37
1 files changed, 23 insertions, 14 deletions
diff --git a/cpp/src/tests/qpid_cpp_benchmark b/cpp/src/tests/qpid_cpp_benchmark
index d830804407..177231f026 100755
--- a/cpp/src/tests/qpid_cpp_benchmark
+++ b/cpp/src/tests/qpid_cpp_benchmark
@@ -43,7 +43,7 @@ op.add_option("--content-size", default=1024, type="int", metavar="BYTES",
op.add_option("--ack-frequency", default=0, metavar="N", type="int",
help="receiver ack's every N messages, 0 means unconfirmed")
-def start_receive(queue, opts):
+def start_receive(queue, opts, ready_queue):
return Popen(["qpid_receive",
"-b", opts.broker,
"-a", "%s;{create:always}"%(queue),
@@ -51,6 +51,7 @@ def start_receive(queue, opts):
"--print-content=no",
"--report-total",
"--ack-frequency", str(opts.ack_frequency),
+ "--ready-address", ready_queue
],
stdout=PIPE, stderr=STDOUT)
@@ -78,34 +79,42 @@ def delete_queues(queues, broker):
except qpid.messaging.exceptions.SendError:pass # Ignore "no such queue"
c.close()
-def wait_for_queues(queues, broker):
- c = qpid.messaging.Connection(broker)
- c.open()
- s = c.session()
- while True:
- try:
- for q in queues: s.sender(q)
- break
- except: pass
- c.close()
-
def skip_first_line(text): return "\n".join(text.split("\n")[1:])
def print_output(processes):
print wait_for_output(processes.pop(0)),
for p in processes: print skip_first_line(wait_for_output(p)),
+class ReadyReceiver:
+ """A receiver for ready messages"""
+ def __init__(self, queue, broker):
+ delete_queues([queue], broker)
+ self.connection = qpid.messaging.Connection(broker)
+ self.connection.open()
+ self.receiver = self.connection.session().receiver(
+ "%s;{create:always,delete:always}"%(queue))
+ self.timeout=2
+
+ def wait(self, n):
+ try:
+ for i in xrange(n): self.receiver.fetch(self.timeout)
+ except qpid.messaging.Empty: raise "Timed out waiting for receivers to be ready"
+ self.connection.close()
+
def main():
opts, args = op.parse_args()
+ ready_queue="%s-ready"%(opts.queue_name)
queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)]
delete_queues(queues, opts.broker)
- receivers = [start_receive(q, opts) for q in queues for i in xrange(opts.receivers)]
- wait_for_queues(queues, opts.broker) # Wait for receivers to be ready
+ ready_receiver = ReadyReceiver(ready_queue, opts.broker)
+ receivers = [start_receive(q, opts, ready_queue) for q in queues for i in xrange(opts.receivers)]
+ ready_receiver.wait(len(receivers)) # Wait for receivers to be ready.
senders = [start_send(q, opts) for q in queues for i in xrange(opts.senders)]
print "Send"
print_output(senders)
print "\nReceive"
print_output(receivers)
+ print
delete_queues(queues, opts.broker)
if __name__ == "__main__": main()