summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-12-15 18:10:12 +0000
committerAlan Conway <aconway@apache.org>2010-12-15 18:10:12 +0000
commit077facba2cddd2c49d14e496dfa942c23a5e66c9 (patch)
treee9c078351733678236a01056a88ec5bf5e50372c /cpp/src
parentc50499e4c309e43367c2ff4ab478d85f88c3124c (diff)
downloadqpid-python-077facba2cddd2c49d14e496dfa942c23a5e66c9.tar.gz
Fix flow control for qpid-cpp-benchmark with multiple senders.
Ensure senders & receivers agree on number of messages sent/received. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1049656 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rwxr-xr-xcpp/src/tests/qpid-cpp-benchmark17
-rw-r--r--cpp/src/tests/qpid-receive.cpp13
-rw-r--r--cpp/src/tests/qpid-send.cpp8
3 files changed, 21 insertions, 17 deletions
diff --git a/cpp/src/tests/qpid-cpp-benchmark b/cpp/src/tests/qpid-cpp-benchmark
index 559adee5a7..e865a49813 100755
--- a/cpp/src/tests/qpid-cpp-benchmark
+++ b/cpp/src/tests/qpid-cpp-benchmark
@@ -75,12 +75,16 @@ def ssh_command(host, command):
"""Convert command into an ssh command on host with quoting"""
return ["ssh", host] + [posix_quote(arg) for arg in command]
-def start_receive(queue, opts, ready_queue, broker, host):
+def start_receive(queue, index, opts, ready_queue, broker, host):
address="%s;{%s}"%(queue,",".join(["create:always"]+opts.receive_option))
+ msg_total=opts.senders*opts.messages
+ messages = msg_total/opts.receivers;
+ if (index < msg_total%opts.receivers): messages += 1
+ if (messages == 0): return None
command = ["qpid-receive",
"-b", broker,
"-a", address,
- "-m", str((opts.senders*opts.messages)/opts.receivers),
+ "-m", str(messages),
"--forever",
"--print-content=no",
"--receive-rate", str(opts.receive_rate),
@@ -101,7 +105,6 @@ def start_send(queue, opts, broker, host):
"-b", broker,
"-a", address,
"--messages", str(opts.messages),
- "--send-eos", str(opts.receivers),
"--content-size", str(opts.content_size),
"--send-rate", str(opts.send_rate),
"--report-total",
@@ -118,7 +121,7 @@ def start_send(queue, opts, broker, host):
def first_line(p):
out,err=p.communicate()
- if p.returncode != 0: raise Exception("ERROR:\n%s"%(out))
+ if p.returncode != 0: raise Exception("Process failed: %s"%(out.strip()))
return out.split("\n")[0]
def delete_queues(queues, broker):
@@ -144,7 +147,7 @@ def parse_senders(senders):
return parse([int],[first_line(p) for p in senders])
def parse_receivers(receivers):
- return parse([int,float,float,float],[first_line(p) for p in receivers])
+ return parse([int,float,float,float],[first_line(p) for p in receivers if p])
def print_data(send_stats, recv_stats):
for send,recv in map(None, send_stats, recv_stats):
@@ -216,9 +219,9 @@ def main():
for i in xrange(opts.repeat):
delete_queues(queues, opts.broker[0])
ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
- receivers = [start_receive(q, opts, ready_queue, brokers.next(), client_hosts.next())
+ receivers = [start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next())
for q in queues for j in xrange(opts.receivers)]
- ready_receiver.wait(receivers) # Wait for receivers to be ready.
+ ready_receiver.wait(filter(None, receivers)) # Wait for receivers to be ready.
senders = [start_send(q, opts,brokers.next(), client_hosts.next())
for q in queues for j in xrange(opts.senders)]
if opts.report_header and i == 0: print_header(opts.timestamp)
diff --git a/cpp/src/tests/qpid-receive.cpp b/cpp/src/tests/qpid-receive.cpp
index 9b84306605..28e229ca27 100644
--- a/cpp/src/tests/qpid-receive.cpp
+++ b/cpp/src/tests/qpid-receive.cpp
@@ -191,8 +191,7 @@ int main(int argc, char ** argv)
int64_t interval = 0;
if (opts.receiveRate) interval = qpid::sys::TIME_SEC/opts.receiveRate;
- Address replyToAddress;
- Sender replyToSender;
+ std::map<std::string,Sender> replyTo;
while (!done && receiver.fetch(msg, timeout)) {
reporter.message(msg);
@@ -227,12 +226,12 @@ int main(int argc, char ** argv)
session.acknowledge();
}
if (msg.getReplyTo()) { // Echo message back to reply-to address.
- if (msg.getReplyTo() != replyToAddress) {
- replyToSender = session.createSender(msg.getReplyTo());
- replyToSender.setCapacity(opts.capacity);
- replyToAddress = msg.getReplyTo();
+ Sender& s = replyTo[msg.getReplyTo().str()];
+ if (s.isNull()) {
+ s = session.createSender(msg.getReplyTo());
+ s.setCapacity(opts.capacity);
}
- replyToSender.send(msg);
+ s.send(msg);
}
if (opts.receiveRate) {
qpid::sys::AbsTime waitTill(start, count*interval);
diff --git a/cpp/src/tests/qpid-send.cpp b/cpp/src/tests/qpid-send.cpp
index e5ae6a9e4a..c71cb83f9a 100644
--- a/cpp/src/tests/qpid-send.cpp
+++ b/cpp/src/tests/qpid-send.cpp
@@ -291,7 +291,7 @@ int main(int argc, char ** argv)
if (opts.sendRate) interval = qpid::sys::TIME_SEC/opts.sendRate;
Receiver flowControlReceiver;
- Address flowControlAddress(Uuid(true).str()+";{create:always}");
+ Address flowControlAddress("flow-"+Uuid(true).str()+";{create:always,delete:always}");
uint flowSent = 0;
if (opts.flowControl) {
flowControlReceiver = session.createReceiver(flowControlAddress);
@@ -322,7 +322,7 @@ int main(int argc, char ** argv)
if (opts.messages && sent >= opts.messages) break;
if (opts.flowControl && flowSent == 2) {
- flowControlReceiver.get(Duration::SECOND*1);
+ flowControlReceiver.get(Duration::SECOND);
--flowSent;
}
@@ -333,11 +333,13 @@ int main(int argc, char ** argv)
}
msg = Message(); // Clear out contents and properties for next iteration
}
+ for ( ; flowSent>0; --flowSent)
+ flowControlReceiver.get(Duration::SECOND);
if (opts.reportTotal) reporter.report();
for (uint i = opts.sendEos; i > 0; --i) {
if (opts.sequence)
msg.getProperties()[SN] = ++sent;
- msg.setContent(EOS);//TODO: add in ability to send digest or similar
+ msg.setContent(EOS); //TODO: add in ability to send digest or similar
sender.send(msg);
}
if (opts.tx) {