diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-05-04 15:39:19 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-05-04 15:39:19 +0000 |
commit | 633c33f224f3196f3f9bd80bd2e418d8143fea06 (patch) | |
tree | 1391da89470593209466df68c0b40b89c14963b1 /cpp/src/tests/qpid-cpp-benchmark | |
parent | c73f9286ebff93a6c8dbc29cf05e258c4b55c976 (diff) | |
download | qpid-python-633c33f224f3196f3f9bd80bd2e418d8143fea06.tar.gz |
QPID-3858: Updated branch - merged from trunk r.1333987
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1334037 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/qpid-cpp-benchmark')
-rwxr-xr-x | cpp/src/tests/qpid-cpp-benchmark | 69 |
1 files changed, 28 insertions, 41 deletions
diff --git a/cpp/src/tests/qpid-cpp-benchmark b/cpp/src/tests/qpid-cpp-benchmark index 19c01dd08a..d5ad5191ca 100755 --- a/cpp/src/tests/qpid-cpp-benchmark +++ b/cpp/src/tests/qpid-cpp-benchmark @@ -37,7 +37,7 @@ op.add_option("-r", "--receivers", default=1, type="int", metavar="N", op.add_option("-m", "--messages", default=100000, type="int", metavar="N", help="send N messages per sender (default %default)") op.add_option("--queue-name", default="benchmark", metavar="NAME", - help="base name for queues (default %default)") + help="base name for queues (default %default)") op.add_option("--send-rate", default=0, metavar="N", help="send rate limited to N messages/second, 0 means no limit (default %default)") op.add_option("--receive-rate", default=0, metavar="N", @@ -67,18 +67,16 @@ op.add_option("--sequence", dest="sequence", default=False, action="store_true", help="add a sequence number to each message") op.add_option("--connection-options", type="str", help="Connection options for senders & receivers") -op.add_option("--flow-control", default=0, type="int", metavar="N", - help="Flow control each sender to limit queue depth to 2*N. 0 means no flow control.") op.add_option("--durable", default=False, action="store_true", help="Use durable queues and messages") op.add_option("--save-received", default=False, action="store_true", help="Save received message content to files <queuename>-receiver-<n>.msg") -op.add_option("--group-receivers", default=False, action="store_true", - help="Run receivers for the same queue on the same host.") op.add_option("--verbose", default=False, action="store_true", help="Show commands executed") op.add_option("--no-delete", default=False, action="store_true", help="Don't delete the test queues.") +op.add_option("--fill-drain", default=False, action="store_true", + help="First fill the queues, then drain them") single_quote_re = re.compile("'") def posix_quote(string): @@ -150,7 +148,6 @@ def start_send(queue, opts, broker, host): "--report-header=no", "--timestamp=%s"%(opts.timestamp and "yes" or "no"), "--sequence=%s"%(opts.sequence and "yes" or "no"), - "--flow-control", str(opts.flow_control), "--durable", str(opts.durable) ] command += opts.send_arg @@ -169,18 +166,6 @@ def first_line(p): raise Exception("Process exit %d: %s"%(p.returncode, error_msg(out,err))) return out.split("\n")[0] -def queue_exists(queue,broker): - c = qpid.messaging.Connection(broker) - c.open() - try: - s = c.session() - try: - s.sender(queue) - return True - except qpid.messaging.exceptions.NotFound: - return False - finally: c.close() - def recreate_queues(queues, brokers, no_delete, opts): c = qpid.messaging.Connection(brokers[0]) c.open() @@ -189,15 +174,9 @@ def recreate_queues(queues, brokers, no_delete, opts): if not no_delete: try: s.sender("%s;{delete:always}"%(q)).close() except qpid.messaging.exceptions.NotFound: pass - # FIXME aconway 2011-05-04: new cluster async wiring, wait for changes to propagate - for b in brokers: - while queue_exists(q,b): time.sleep(0.1); address = "%s;{%s}"%(q, ",".join(opts.create_option + ["create:always"])) if opts.verbose: print "Creating", address s.sender(address) - # FIXME aconway 2011-05-04: new cluster async wiring, wait for changes to propagate - for b in brokers: - while not queue_exists(q,b): time.sleep(0.1); c.close() def print_header(timestamp): @@ -295,24 +274,32 @@ def main(): recreate_queues(queues, opts.broker, opts.no_delete, opts) ready_receiver = ReadyReceiver(ready_queue, opts.broker[0]) - if opts.group_receivers: # Run receivers for same queue against same broker. - receivers = [] - for q in queues: - b = brokers.next() - for j in xrange(opts.receivers): - receivers.append( - start_receive(q, j, opts, ready_queue, b, client_hosts.next())) - else: # Don't group receivers - receivers = [start_receive(q, j, opts, ready_queue, - brokers.next(), client_hosts.next()) - for q in queues for j in xrange(opts.receivers)] - - ready_receiver.wait(filter(None, receivers)) # Wait for receivers to be ready. - start = time.time() - senders = [start_send(q, opts,brokers.next(), client_hosts.next()) - for q in queues for j in xrange(opts.senders)] + def start_receivers(): + return [ start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next()) + for q in queues for j in xrange(opts.receivers) ] + + + def start_senders(): + return [ start_send(q, opts,brokers.next(), client_hosts.next()) + for q in queues for j in xrange(opts.senders) ] + if opts.report_header and i == 0: print_header(opts.timestamp) - for p in senders + receivers: p.wait() + + if opts.fill_drain: + # First fill the queues, then drain them + start = time.time() + senders = start_senders() + for p in senders: p.wait() + receivers = start_receivers() + for p in receivers: p.wait() + else: + # Run senders and receivers in parallel + receivers = start_receivers() + ready_receiver.wait(filter(None, receivers)) # Wait for receivers ready + start = time.time() + senders = start_senders() + for p in senders + receivers: p.wait() + total_sent = opts.queues * opts.senders * opts.messages total_tp = total_sent / (time.time()-start) send_stats=parse_senders(senders) |