summaryrefslogtreecommitdiff
path: root/cpp/src/tests/qpid-cpp-benchmark
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-05-04 15:39:19 +0000
committerKim van der Riet <kpvdr@apache.org>2012-05-04 15:39:19 +0000
commit633c33f224f3196f3f9bd80bd2e418d8143fea06 (patch)
tree1391da89470593209466df68c0b40b89c14963b1 /cpp/src/tests/qpid-cpp-benchmark
parentc73f9286ebff93a6c8dbc29cf05e258c4b55c976 (diff)
downloadqpid-python-633c33f224f3196f3f9bd80bd2e418d8143fea06.tar.gz
QPID-3858: Updated branch - merged from trunk r.1333987
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1334037 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/qpid-cpp-benchmark')
-rwxr-xr-xcpp/src/tests/qpid-cpp-benchmark69
1 files changed, 28 insertions, 41 deletions
diff --git a/cpp/src/tests/qpid-cpp-benchmark b/cpp/src/tests/qpid-cpp-benchmark
index 19c01dd08a..d5ad5191ca 100755
--- a/cpp/src/tests/qpid-cpp-benchmark
+++ b/cpp/src/tests/qpid-cpp-benchmark
@@ -37,7 +37,7 @@ op.add_option("-r", "--receivers", default=1, type="int", metavar="N",
op.add_option("-m", "--messages", default=100000, type="int", metavar="N",
help="send N messages per sender (default %default)")
op.add_option("--queue-name", default="benchmark", metavar="NAME",
- help="base name for queues (default %default)")
+ help="base name for queues (default %default)")
op.add_option("--send-rate", default=0, metavar="N",
help="send rate limited to N messages/second, 0 means no limit (default %default)")
op.add_option("--receive-rate", default=0, metavar="N",
@@ -67,18 +67,16 @@ op.add_option("--sequence", dest="sequence", default=False,
action="store_true", help="add a sequence number to each message")
op.add_option("--connection-options", type="str",
help="Connection options for senders & receivers")
-op.add_option("--flow-control", default=0, type="int", metavar="N",
- help="Flow control each sender to limit queue depth to 2*N. 0 means no flow control.")
op.add_option("--durable", default=False, action="store_true",
help="Use durable queues and messages")
op.add_option("--save-received", default=False, action="store_true",
help="Save received message content to files <queuename>-receiver-<n>.msg")
-op.add_option("--group-receivers", default=False, action="store_true",
- help="Run receivers for the same queue on the same host.")
op.add_option("--verbose", default=False, action="store_true",
help="Show commands executed")
op.add_option("--no-delete", default=False, action="store_true",
help="Don't delete the test queues.")
+op.add_option("--fill-drain", default=False, action="store_true",
+ help="First fill the queues, then drain them")
single_quote_re = re.compile("'")
def posix_quote(string):
@@ -150,7 +148,6 @@ def start_send(queue, opts, broker, host):
"--report-header=no",
"--timestamp=%s"%(opts.timestamp and "yes" or "no"),
"--sequence=%s"%(opts.sequence and "yes" or "no"),
- "--flow-control", str(opts.flow_control),
"--durable", str(opts.durable)
]
command += opts.send_arg
@@ -169,18 +166,6 @@ def first_line(p):
raise Exception("Process exit %d: %s"%(p.returncode, error_msg(out,err)))
return out.split("\n")[0]
-def queue_exists(queue,broker):
- c = qpid.messaging.Connection(broker)
- c.open()
- try:
- s = c.session()
- try:
- s.sender(queue)
- return True
- except qpid.messaging.exceptions.NotFound:
- return False
- finally: c.close()
-
def recreate_queues(queues, brokers, no_delete, opts):
c = qpid.messaging.Connection(brokers[0])
c.open()
@@ -189,15 +174,9 @@ def recreate_queues(queues, brokers, no_delete, opts):
if not no_delete:
try: s.sender("%s;{delete:always}"%(q)).close()
except qpid.messaging.exceptions.NotFound: pass
- # FIXME aconway 2011-05-04: new cluster async wiring, wait for changes to propagate
- for b in brokers:
- while queue_exists(q,b): time.sleep(0.1);
address = "%s;{%s}"%(q, ",".join(opts.create_option + ["create:always"]))
if opts.verbose: print "Creating", address
s.sender(address)
- # FIXME aconway 2011-05-04: new cluster async wiring, wait for changes to propagate
- for b in brokers:
- while not queue_exists(q,b): time.sleep(0.1);
c.close()
def print_header(timestamp):
@@ -295,24 +274,32 @@ def main():
recreate_queues(queues, opts.broker, opts.no_delete, opts)
ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
- if opts.group_receivers: # Run receivers for same queue against same broker.
- receivers = []
- for q in queues:
- b = brokers.next()
- for j in xrange(opts.receivers):
- receivers.append(
- start_receive(q, j, opts, ready_queue, b, client_hosts.next()))
- else: # Don't group receivers
- receivers = [start_receive(q, j, opts, ready_queue,
- brokers.next(), client_hosts.next())
- for q in queues for j in xrange(opts.receivers)]
-
- ready_receiver.wait(filter(None, receivers)) # Wait for receivers to be ready.
- start = time.time()
- senders = [start_send(q, opts,brokers.next(), client_hosts.next())
- for q in queues for j in xrange(opts.senders)]
+ def start_receivers():
+ return [ start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next())
+ for q in queues for j in xrange(opts.receivers) ]
+
+
+ def start_senders():
+ return [ start_send(q, opts,brokers.next(), client_hosts.next())
+ for q in queues for j in xrange(opts.senders) ]
+
if opts.report_header and i == 0: print_header(opts.timestamp)
- for p in senders + receivers: p.wait()
+
+ if opts.fill_drain:
+ # First fill the queues, then drain them
+ start = time.time()
+ senders = start_senders()
+ for p in senders: p.wait()
+ receivers = start_receivers()
+ for p in receivers: p.wait()
+ else:
+ # Run senders and receivers in parallel
+ receivers = start_receivers()
+ ready_receiver.wait(filter(None, receivers)) # Wait for receivers ready
+ start = time.time()
+ senders = start_senders()
+ for p in senders + receivers: p.wait()
+
total_sent = opts.queues * opts.senders * opts.messages
total_tp = total_sent / (time.time()-start)
send_stats=parse_senders(senders)