summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-04-12 17:49:57 +0000
committerAlan Conway <aconway@apache.org>2010-04-12 17:49:57 +0000
commit9c6d2488b20df5ca73300f01a36f7f89fe2dd929 (patch)
tree9747e1e9b0b223b72bcb1e80ef6cf540e9f1df97 /cpp
parent9d2af98f3234ef0f635d01cbb1540e0d2b00264d (diff)
downloadqpid-python-9c6d2488b20df5ca73300f01a36f7f89fe2dd929.tar.gz
qpid_cpp_benchmark waits for receivers to be ready before starting senders.
This avoids exaggerated latency numbers due to messages siting on the queue while receivers are connecting and subscribing. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@933333 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rwxr-xr-xcpp/src/tests/qpid_cpp_benchmark37
-rw-r--r--cpp/src/tests/qpid_receive.cpp6
2 files changed, 29 insertions, 14 deletions
diff --git a/cpp/src/tests/qpid_cpp_benchmark b/cpp/src/tests/qpid_cpp_benchmark
index d830804407..177231f026 100755
--- a/cpp/src/tests/qpid_cpp_benchmark
+++ b/cpp/src/tests/qpid_cpp_benchmark
@@ -43,7 +43,7 @@ op.add_option("--content-size", default=1024, type="int", metavar="BYTES",
op.add_option("--ack-frequency", default=0, metavar="N", type="int",
help="receiver ack's every N messages, 0 means unconfirmed")
-def start_receive(queue, opts):
+def start_receive(queue, opts, ready_queue):
return Popen(["qpid_receive",
"-b", opts.broker,
"-a", "%s;{create:always}"%(queue),
@@ -51,6 +51,7 @@ def start_receive(queue, opts):
"--print-content=no",
"--report-total",
"--ack-frequency", str(opts.ack_frequency),
+ "--ready-address", ready_queue
],
stdout=PIPE, stderr=STDOUT)
@@ -78,34 +79,42 @@ def delete_queues(queues, broker):
except qpid.messaging.exceptions.SendError:pass # Ignore "no such queue"
c.close()
-def wait_for_queues(queues, broker):
- c = qpid.messaging.Connection(broker)
- c.open()
- s = c.session()
- while True:
- try:
- for q in queues: s.sender(q)
- break
- except: pass
- c.close()
-
def skip_first_line(text): return "\n".join(text.split("\n")[1:])
def print_output(processes):
print wait_for_output(processes.pop(0)),
for p in processes: print skip_first_line(wait_for_output(p)),
+class ReadyReceiver:
+ """A receiver for ready messages"""
+ def __init__(self, queue, broker):
+ delete_queues([queue], broker)
+ self.connection = qpid.messaging.Connection(broker)
+ self.connection.open()
+ self.receiver = self.connection.session().receiver(
+ "%s;{create:always,delete:always}"%(queue))
+ self.timeout=2
+
+ def wait(self, n):
+ try:
+ for i in xrange(n): self.receiver.fetch(self.timeout)
+ except qpid.messaging.Empty: raise "Timed out waiting for receivers to be ready"
+ self.connection.close()
+
def main():
opts, args = op.parse_args()
+ ready_queue="%s-ready"%(opts.queue_name)
queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)]
delete_queues(queues, opts.broker)
- receivers = [start_receive(q, opts) for q in queues for i in xrange(opts.receivers)]
- wait_for_queues(queues, opts.broker) # Wait for receivers to be ready
+ ready_receiver = ReadyReceiver(ready_queue, opts.broker)
+ receivers = [start_receive(q, opts, ready_queue) for q in queues for i in xrange(opts.receivers)]
+ ready_receiver.wait(len(receivers)) # Wait for receivers to be ready.
senders = [start_send(q, opts) for q in queues for i in xrange(opts.senders)]
print "Send"
print_output(senders)
print "\nReceive"
print_output(receivers)
+ print
delete_queues(queues, opts.broker)
if __name__ == "__main__": main()
diff --git a/cpp/src/tests/qpid_receive.cpp b/cpp/src/tests/qpid_receive.cpp
index 902e855c2b..d67232ad86 100644
--- a/cpp/src/tests/qpid_receive.cpp
+++ b/cpp/src/tests/qpid_receive.cpp
@@ -22,6 +22,7 @@
#include <qpid/messaging/Address.h>
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Receiver.h>
+#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <qpid/messaging/Message.h>
#include <qpid/Options.h>
@@ -63,6 +64,7 @@ struct Options : public qpid::Options
qpid::log::Options log;
bool reportTotal;
uint reportEvery;
+ string readyAddress;
Options(const std::string& argv0=std::string())
: qpid::Options("Options"),
@@ -100,6 +102,8 @@ struct Options : public qpid::Options
("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover")
("report-total", qpid::optValue(reportTotal), "Report total throughput and latency statistics")
("report-every", qpid::optValue(reportEvery,"N"), "Report throughput and latency statistics every N messages.")
+ ("ready-address", qpid::optValue(readyAddress, "ADDRESS"),
+ "send a message to this address when ready to receive")
("help", qpid::optValue(help), "print this usage statement");
add(log);
}
@@ -173,6 +177,8 @@ int main(int argc, char ** argv)
Duration timeout = opts.getTimeout();
bool done = false;
Reporter<ThroughputAndLatency> reporter(std::cout, opts.reportEvery);
+ if (!opts.readyAddress.empty())
+ session.createSender(opts.readyAddress).send(msg);
while (!done && receiver.fetch(msg, timeout)) {
reporter.message(msg);
if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) {