diff options
Diffstat (limited to 'cpp')
-rwxr-xr-x | cpp/src/tests/qpid_cpp_benchmark | 37 | ||||
-rw-r--r-- | cpp/src/tests/qpid_receive.cpp | 6 |
2 files changed, 29 insertions, 14 deletions
diff --git a/cpp/src/tests/qpid_cpp_benchmark b/cpp/src/tests/qpid_cpp_benchmark index d830804407..177231f026 100755 --- a/cpp/src/tests/qpid_cpp_benchmark +++ b/cpp/src/tests/qpid_cpp_benchmark @@ -43,7 +43,7 @@ op.add_option("--content-size", default=1024, type="int", metavar="BYTES", op.add_option("--ack-frequency", default=0, metavar="N", type="int", help="receiver ack's every N messages, 0 means unconfirmed") -def start_receive(queue, opts): +def start_receive(queue, opts, ready_queue): return Popen(["qpid_receive", "-b", opts.broker, "-a", "%s;{create:always}"%(queue), @@ -51,6 +51,7 @@ def start_receive(queue, opts): "--print-content=no", "--report-total", "--ack-frequency", str(opts.ack_frequency), + "--ready-address", ready_queue ], stdout=PIPE, stderr=STDOUT) @@ -78,34 +79,42 @@ def delete_queues(queues, broker): except qpid.messaging.exceptions.SendError:pass # Ignore "no such queue" c.close() -def wait_for_queues(queues, broker): - c = qpid.messaging.Connection(broker) - c.open() - s = c.session() - while True: - try: - for q in queues: s.sender(q) - break - except: pass - c.close() - def skip_first_line(text): return "\n".join(text.split("\n")[1:]) def print_output(processes): print wait_for_output(processes.pop(0)), for p in processes: print skip_first_line(wait_for_output(p)), +class ReadyReceiver: + """A receiver for ready messages""" + def __init__(self, queue, broker): + delete_queues([queue], broker) + self.connection = qpid.messaging.Connection(broker) + self.connection.open() + self.receiver = self.connection.session().receiver( + "%s;{create:always,delete:always}"%(queue)) + self.timeout=2 + + def wait(self, n): + try: + for i in xrange(n): self.receiver.fetch(self.timeout) + except qpid.messaging.Empty: raise "Timed out waiting for receivers to be ready" + self.connection.close() + def main(): opts, args = op.parse_args() + ready_queue="%s-ready"%(opts.queue_name) queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)] delete_queues(queues, opts.broker) - receivers = [start_receive(q, opts) for q in queues for i in xrange(opts.receivers)] - wait_for_queues(queues, opts.broker) # Wait for receivers to be ready + ready_receiver = ReadyReceiver(ready_queue, opts.broker) + receivers = [start_receive(q, opts, ready_queue) for q in queues for i in xrange(opts.receivers)] + ready_receiver.wait(len(receivers)) # Wait for receivers to be ready. senders = [start_send(q, opts) for q in queues for i in xrange(opts.senders)] print "Send" print_output(senders) print "\nReceive" print_output(receivers) + print delete_queues(queues, opts.broker) if __name__ == "__main__": main() diff --git a/cpp/src/tests/qpid_receive.cpp b/cpp/src/tests/qpid_receive.cpp index 902e855c2b..d67232ad86 100644 --- a/cpp/src/tests/qpid_receive.cpp +++ b/cpp/src/tests/qpid_receive.cpp @@ -22,6 +22,7 @@ #include <qpid/messaging/Address.h> #include <qpid/messaging/Connection.h> #include <qpid/messaging/Receiver.h> +#include <qpid/messaging/Sender.h> #include <qpid/messaging/Session.h> #include <qpid/messaging/Message.h> #include <qpid/Options.h> @@ -63,6 +64,7 @@ struct Options : public qpid::Options qpid::log::Options log; bool reportTotal; uint reportEvery; + string readyAddress; Options(const std::string& argv0=std::string()) : qpid::Options("Options"), @@ -100,6 +102,8 @@ struct Options : public qpid::Options ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover") ("report-total", qpid::optValue(reportTotal), "Report total throughput and latency statistics") ("report-every", qpid::optValue(reportEvery,"N"), "Report throughput and latency statistics every N messages.") + ("ready-address", qpid::optValue(readyAddress, "ADDRESS"), + "send a message to this address when ready to receive") ("help", qpid::optValue(help), "print this usage statement"); add(log); } @@ -173,6 +177,8 @@ int main(int argc, char ** argv) Duration timeout = opts.getTimeout(); bool done = false; Reporter<ThroughputAndLatency> reporter(std::cout, opts.reportEvery); + if (!opts.readyAddress.empty()) + session.createSender(opts.readyAddress).send(msg); while (!done && receiver.fetch(msg, timeout)) { reporter.message(msg); if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) { |