diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-08-03 12:13:32 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-08-03 12:13:32 +0000 |
commit | d43d1912b376322e27fdcda551a73f9ff5487972 (patch) | |
tree | ce493e10baa95f44be8beb5778ce51783463196d /java/tools | |
parent | 04877fec0c6346edec67072d7f2d247740cf2af5 (diff) | |
download | qpid-python-d43d1912b376322e27fdcda551a73f9ff5487972.tar.gz |
QPID-3858: Updated branch - merged from trunk r.1368650
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368910 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/tools')
16 files changed, 1753 insertions, 884 deletions
diff --git a/java/tools/bin/Profile-run-from-source b/java/tools/bin/Profile-run-from-source index f8ec45ccff..179c365450 100755 --- a/java/tools/bin/Profile-run-from-source +++ b/java/tools/bin/Profile-run-from-source @@ -49,7 +49,7 @@ export LOG_CONFIG="-Dlog4j.configuration=file:///$QPID_CHECKOUT/java/tools/etc/t #------------- Required for qpid-python-testkit ----------------------------------------- -PYTHONPATH=$QPID_CHECKOUT/python/qpid:$QPID_CHECKOUT/cpp/src/test/brokertest.py:$PYTHONPATH +PYTHONPATH=$QPID_CHECKOUT/python:$QPID_CHECKOUT/cpp/src/test/brokertest.py:$PYTHONPATH export PATH=$QPID_CHECKOUT/python:$PATH if [ -x $QPID_CHECKOUT/cpp/src/qpidd ]; then diff --git a/java/tools/bin/mercury-controller b/java/tools/bin/mercury-controller index fab8614039..fab8614039 100644..100755 --- a/java/tools/bin/mercury-controller +++ b/java/tools/bin/mercury-controller diff --git a/java/tools/bin/mercury-start-consumers b/java/tools/bin/mercury-start-consumers index c71fc0c21f..c71fc0c21f 100644..100755 --- a/java/tools/bin/mercury-start-consumers +++ b/java/tools/bin/mercury-start-consumers diff --git a/java/tools/bin/mercury-start-producers b/java/tools/bin/mercury-start-producers index 7ba0286f7c..7ba0286f7c 100644..100755 --- a/java/tools/bin/mercury-start-producers +++ b/java/tools/bin/mercury-start-producers diff --git a/java/tools/bin/qpid-jms-benchmark b/java/tools/bin/qpid-jms-benchmark new file mode 100755 index 0000000000..3d712a27dc --- /dev/null +++ b/java/tools/bin/qpid-jms-benchmark @@ -0,0 +1,316 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import optparse, time, qpid.messaging, re +from threading import Thread +from subprocess import Popen, PIPE, STDOUT + +op = optparse.OptionParser(usage="usage: %prog [options]", + description="simple performance benchmarks") +op.add_option("-b", "--broker", default=[], action="append", type="str", + help="url of broker(s) to connect to, round robin on multiple brokers") +op.add_option("-c", "--client-host", default=[], action="append", type="str", + help="host(s) to run clients on via ssh, round robin on mulple hosts") +op.add_option("-q", "--queues", default=1, type="int", metavar="N", + help="create N queues (default %default)") +op.add_option("-s", "--senders", default=1, type="int", metavar="N", + help="start N senders per queue (default %default)") +op.add_option("-r", "--receivers", default=1, type="int", metavar="N", + help="start N receivers per queue (default %default)") +op.add_option("-m", "--messages", default=100000, type="int", metavar="N", + help="send N messages per sender (default %default)") +op.add_option("--queue-name", default="benchmark", metavar="NAME", + help="base name for queues (default %default)") +op.add_option("--send-rate", default=0, metavar="N", + help="send rate limited to N messages/second, 0 means no limit (default %default)") +op.add_option("--receive-rate", default=0, metavar="N", + help="receive rate limited to N messages/second, 0 means no limit (default %default)") +op.add_option("--content-size", default=1024, type="int", metavar="BYTES", + help="message size in bytes (default %default)") +op.add_option("--ack-frequency", default=100, metavar="N", type="int", + help="receiver ack's every N messages, 0 means unconfirmed (default %default)") +op.add_option("--no-report-header", dest="report_header", default=True, + action="store_false", help="don't print header on report") +op.add_option("--summarize", default=False, action="store_true", + help="print summary statistics for multiple senders/receivers: total throughput, average latency") +op.add_option("--repeat", default=1, metavar="N", help="repeat N times", type="int") +op.add_option("--send-option", default=[], action="append", type="str", + help="Additional option for sending addresses") +op.add_option("--receive-option", default=[], action="append", type="str", + help="Additional option for receiving addresses") +op.add_option("--create-option", default=[], action="append", type="str", + help="Additional option for creating addresses") +op.add_option("--send-arg", default=[], action="append", type="str", + help="Additional argument for qpid-send") +op.add_option("--receive-arg", default=[], action="append", type="str", + help="Additional argument for qpid-receive") +op.add_option("--no-timestamp", dest="timestamp", default=True, + action="store_false", help="don't add a timestamp, no latency results") +op.add_option("--sequence", dest="sequence", default=False, + action="store_true", help="add a sequence number to each message") +op.add_option("--connection-options", type="str", + help="Connection options for senders & receivers") +op.add_option("--durable", default=False, action="store_true", + help="Use durable queues and messages") +op.add_option("--save-received", default=False, action="store_true", + help="Save received message content to files <queuename>-receiver-<n>.msg") +op.add_option("--verbose", default=False, action="store_true", + help="Show commands executed") +op.add_option("--no-delete", default=False, action="store_true", + help="Don't delete the test queues.") +op.add_option("--fill-drain", default=False, action="store_true", + help="First fill the queues, then drain them") + +single_quote_re = re.compile("'") +def posix_quote(string): + """ Quote a string for use as an argument in a posix shell""" + return "'" + single_quote_re.sub("\\'", string) + "'"; + +def ssh_command(host, command): + """ Convert command into an ssh command on host with quoting""" + return ["ssh", host] + [posix_quote(arg) for arg in command] + +class Clients: + def __init__(self): self.clients=[] + + def add(self, client): + self.clients.append(client) + return client + + def kill(self): + for c in self.clients: + try: c.kill() + except: pass + +class PopenCommand(Popen): + """Like Popen but you can query for the command""" + def __init__(self, command, *args, **kwargs): + self.command = command + Popen.__init__(self, command, *args, **kwargs) + +clients = Clients() + +def start_receive(queue, index, opts, ready_queue, broker, host): + address_opts=opts.receive_option + if opts.durable: address_opts += ["node:{durable:true}"] + address="%s;{%s}"%(queue,",".join(address_opts)) + msg_total=opts.senders*opts.messages + messages = msg_total/opts.receivers; + if (index < msg_total%opts.receivers): messages += 1 + if (messages == 0): return None + command = ["qpid-jms-receive", + #"-b", broker, + "--ready-address", "benchmark-ready;{create:always}", + "-a", address, + "-m", str(messages), + "--forever", + "--print-content=no", + # "--receive-rate", str(opts.receive_rate), + "--report-total", + "--ack-frequency", str(opts.ack_frequency), + # "--ready-address", "%s;{create:always}"%ready_queue, + "--report-header=no -v" + ] + if opts.save_received: + command += ["--save-content=%s-receiver-%s.msg"%(queue,index)] + command += opts.receive_arg + if opts.connection_options: + command += ["--connection-options",opts.connection_options] + if host: command = ssh_command(host, command) + if opts.verbose: print "Receiver: ", command + return clients.add(PopenCommand(command, stdout=PIPE, stderr=PIPE)) + +def start_send(queue, opts, broker, host): + address="%s;{%s}"%(queue,",".join(opts.send_option + ["create:always"])) + command = ["qpid-jms-send", + #"-b", broker, + "-a", address, + "--messages", str(opts.messages), + "--content-size", str(opts.content_size), + "--send-rate", str(opts.send_rate), + "--report-total", + "--report-header=no", + "--timestamp=%s"%(opts.timestamp and "yes" or "no"), + "--sequence=%s"%(opts.sequence and "yes" or "no"), + "--durable", str(opts.durable) + ] + command += opts.send_arg + if opts.connection_options: + command += ["--connection-options",opts.connection_options] + if host: command = ssh_command(host, command) + if opts.verbose: print "Sender: ", command + return clients.add(PopenCommand(command, stdout=PIPE, stderr=PIPE)) + +def error_msg(out, err): + return ("\n[stdout]\n%s\n[stderr]\n%s[end]"%(out, err)) + +def first_line(p): + out,err=p.communicate() + if p.returncode != 0: + raise Exception("Process exit %d: %s"%(p.returncode, error_msg(out,err))) + + print str(out) + print str(err) + return out.split("\n")[0] + +def recreate_queues(queues, brokers, no_delete, opts): + c = qpid.messaging.Connection(brokers[0]) + c.open() + s = c.session() + for q in queues: + if not no_delete: + try: s.sender("%s;{delete:always}"%(q)).close() + except qpid.messaging.exceptions.NotFound: pass + address = "%s;{%s}"%(q, ",".join(opts.create_option + ["create:always"])) + if opts.verbose: print "Creating", address + s.sender(address) + c.close() + +def print_header(timestamp): + if timestamp: latency_header="\tl-min\tl-max\tl-avg\ttotal-tp" + else: latency_header="" + print "send-tp\trecv-tp%s"%latency_header + +def parse(parser, lines): # Parse sender/receiver output + return [map(lambda p: p[0](p[1]), zip(parser,line.split())) for line in lines] + +def parse_senders(senders): + return parse([int],[first_line(p) for p in senders]) + +def parse_receivers(receivers): + return parse([int,float,float,float],[first_line(p) for p in receivers if p]) + +def print_data(send_stats, recv_stats, total_tp): + for send,recv in map(None, send_stats, recv_stats): + line="" + if send: line += "%d"%send[0] + if recv: + line += "\t%d"%recv[0] + if len(recv) == 4: line += "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:]) + if total_tp is not None: + line += "\t%d"%total_tp + total_tp = None + print line + +def print_summary(send_stats, recv_stats, total_tp): + def avg(s): sum(s) / len(s) + send_tp = sum([l[0] for l in send_stats]) + recv_tp = sum([l[0] for l in recv_stats]) + summary = "%d\t%d"%(send_tp, recv_tp) + if recv_stats and len(recv_stats[0]) == 4: + l_min = sum(l[1] for l in recv_stats)/len(recv_stats) + l_max = sum(l[2] for l in recv_stats)/len(recv_stats) + l_avg = sum(l[3] for l in recv_stats)/len(recv_stats) + summary += "\t%.2f\t%.2f\t%.2f"%(l_min, l_max, l_avg) + summary += "\t%d"%total_tp + print summary + + +class ReadyReceiver: + """A receiver for ready messages""" + def __init__(self, queue, broker): + self.connection = qpid.messaging.Connection(broker) + self.connection.open() + self.receiver = self.connection.session().receiver( + "%s;{create:receiver,delete:receiver,node:{durable:false}}"%(queue)) + self.receiver.session.sync() + self.timeout=10 + + def wait(self, receivers): + try: + for i in receivers: self.receiver.fetch(self.timeout) + self.connection.close() + except qpid.messaging.Empty: + for r in receivers: + if (r.poll() is not None): + out,err=r.communicate() + raise Exception("Receiver error: %s\n%s" % + (" ".join(r.command), error_msg(out,err))) + raise Exception("Timed out waiting for receivers to be ready") + +def flatten(l): + return sum(map(lambda s: re.split(re.compile("\s*,\s*|\s+"), s), l), []) + +class RoundRobin: + def __init__(self,items): + self.items = items + self.index = 0 + + def next(self): + if not self.items: return None + ret = self.items[self.index] + self.index = (self.index+1)%len(self.items) + return ret + +def main(): + opts, args = op.parse_args() + opts.client_host = flatten(opts.client_host) + if not opts.broker: + if opts.client_host: + raise Exception("--broker must be specified if --client_host is.") + opts.broker = ["127.0.0.1"] # Deafult to local broker + opts.broker = flatten(opts.broker) + brokers = RoundRobin(opts.broker) + client_hosts = RoundRobin(opts.client_host) + send_out = "" + receive_out = "" + ready_queue="%s-ready"%(opts.queue_name) + queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)] + try: + for i in xrange(opts.repeat): + recreate_queues(queues, opts.broker, opts.no_delete, opts) + ready_receiver = ReadyReceiver(ready_queue, opts.broker[0]) + + def start_receivers(): + return [ start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next()) + for q in queues for j in xrange(opts.receivers) ] + + + def start_senders(): + return [ start_send(q, opts,brokers.next(), client_hosts.next()) + for q in queues for j in xrange(opts.senders) ] + + if opts.report_header and i == 0: print_header(opts.timestamp) + + if opts.fill_drain: + # First fill the queues, then drain them + start = time.time() + senders = start_senders() + for p in senders: p.wait() + receivers = start_receivers() + for p in receivers: p.wait() + else: + # Run senders and receivers in parallel + receivers = start_receivers() + ready_receiver.wait(filter(None, receivers)) # Wait for receivers ready + start = time.time() + senders = start_senders() + for p in senders + receivers: p.wait() + + total_sent = opts.queues * opts.senders * opts.messages + total_tp = total_sent / (time.time()-start) + #send_stats=parse_senders(senders) + recv_stats=parse_receivers(receivers) + #if opts.summarize: print_summary(send_stats, recv_stats, total_tp) + #else: print_data(send_stats, recv_stats, total_tp) + finally: clients.kill() # No strays + +if __name__ == "__main__": main() + diff --git a/java/tools/bin/qpid-jms-receive b/java/tools/bin/qpid-jms-receive new file mode 100755 index 0000000000..57abe874ff --- /dev/null +++ b/java/tools/bin/qpid-jms-receive @@ -0,0 +1,193 @@ +#!/bin/sh +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# This starts the controller for coordinating perf tests/ + +. check-qpid-java-env + +PROGRAM_NAME="qpid-jms-receive" +URL="amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'" +ADDRESS="queue;{create:always}" +TIMEOUT="0" +FOREVER="false" +MESSAGES="1" +IGNORE_DUPLICATES="false" +CHECK_REDELIVERED="false" +CAPACITY="1000" +ACK_FREQUENCY="100" +TX="0" +ROLLBACL_FREQUENCY="0" +PRINT_CONTENT="false" +PRINT_HEADERS="false" +REPORT_TOTAL="false" +REPORT_EVERY="0" +REPORT_HEADER="true" +READY_ADDRES="''" +EXTRA_JVM_ARGS="" +VERBOSE="0" + +TEST_ID=`echo ${HOSTNAME} | awk -F . '{print $1}'` + +TEMP=$(getopt -n $PROGRAM_NAME -o b:a:f:m:vh\ + --long broker:,address:,timeout:,forever\ +,messages:,ignore-duplicates,check-redelivered\ +,capacity:,ack-frequency:,tx:,rollback-frequency:\ +,print-content:,print-headers:,report-total\ +,report-every:,report-header:,ready-address:\ +,jvm-args:,verbose,help -- "$@") + +# padding the option string with 4 spaces +# padding the desc string with 30 spaces +usage() +{ + printf "\n%s\n" "Usage: $PROGRAM_NAME [option].." + + printf "\n%20s\n%57s\n" "-b, --broker URL" "url of broker to connect to" + + printf "\n%24s\n%53s\n" "-a,--address ADDRESS" "address to receive from" + + printf "\n%25s\n%71s\n" "--timeout TIMEOUT (0)" "timeout in seconds to wait before exiting" + + printf "\n%17s\n%61s\n" "-f, --forever" "ignore timeout and wait forever" + + printf "\n%24s\n%89s\n" "-m, --messages N (0)" "Number of messages to receive; 0 means receive indefinitely" + + printf "\n%23s\n%84s\n" "--ignore-duplicates" "Detect and ignore duplicates (by checking 'sn' header)" + + printf "\n%23s\n%82s\n%92s\n" "--check-redelivered" "Fails with exception if a duplicate is not marked as" " redelivered (only relevant when ignore-duplicates is selected)" + + printf "\n%23s\n%71s\n" "--capacity N (1000)" "Pre-fetch window (0 implies no pre-fetch)" + + printf "\n%27s\n%94s\n" "--ack-frequency N (100)" "Ack frequency (0 implies none of the messages will get accepted)" + + printf "\n%14s\n%94s\n" "--tx N (0)" "batch size for transactions (0 implies transaction are not used)" + + printf "\n%30s\n%94s\n" "--rollback-frequency N (0)" "rollback frequency (0 implies no transaction will be rolledback)" + + printf "\n%30s\n%55s\n" "--print-content yes|no (0)" "print out message content" + + printf "\n%30s\n%55s\n" "--print-headers yes|no (0)" "print out message headers" + + printf "\n%18s\n%76s\n" "--report-total" "Report total throughput and latency statistics" + + printf "\n%24s\n%87s\n" "--report-every N (0)" "Report throughput and latency statistics every N messages" + + printf "\n%30s\n%47s\n" "--report-header yes|no (1)" "Headers on report" + + printf "\n%27s\n%82s\n" "--ready-address ADDRESS" "send a message to this address when ready to receive" + + printf "\n%14s\n%69s\n" "--jvm-args" "Extra jvm arguments you want to specify" + + printf "\n%17s\n%69s\n\n" "-v, --verbose" "Print debug information for this script" +} + +eval set -- "$TEMP" +while true; do + case $1 in + -b|--broker) + URL="$2"; shift; shift; continue + ;; + -a|--address) + ADDRESS="$2"; shift; shift; continue + ;; + --timeout) + TIMEOUT="$2"; shift; shift; continue + ;; + -f|--forever) + FOREVER="$2"; shift; shift; continue + ;; + -m|--messages) + MESSAGES="$2"; shift; shift; continue + ;; + --ignore-duplicates) + IGNORE_DUPLICATES="true"; shift; continue + ;; + --check-redelivered) + CHECK_REDELIVERED="true"; shift; continue + ;; + --capacity) + CAPACITY="$2"; shift; shift; continue + ;; + --ack-frequency) + ACK_FREQUENCY="$2"; shift; shift; continue + ;; + --tx) + TX="$2"; shift; shift; continue + ;; + --rollback-frequency) + ROLLBACK_FREQUENCY="$2"; shift; shift; continue + ;; + --print-content) + if [ "$2" == "yes" ]; then PRINT_CONTENT="true"; else PRINT_CONTENT="false"; fi; shift; shift; continue + ;; + --print-headers) + if [ "$2" == "yes" ]; then PRINT_HEADERS="true"; else PRINT_HEADERS="false"; fi; shift; shift; continue + ;; + --report-total) + REPORT_TOTAL="true"; shift; continue + ;; + --report-every) + REPORT_EVERY="$2"; shift; shift; continue + ;; + --report-header) + if [ "$2" == "yes" ]; then REPORT_HEADER="true"; else REPORT_HEADER="false"; fi; shift; shift; continue + ;; + --ready-address) + READY_ADDRESS="$2"; shift; shift; continue + ;; + -a|--jvm-args) + EXTRA_JVM_ARGS="$2"; shift; shift; continue + ;; + -h|--help) + usage + exit 0 + ;; + -v|--verbose) + VERBOSE="1"; shift; continue + ;; + --) + # no more arguments to parse + break + ;; + *) + # no more arguments to parse + break + ;; + esac +done + +RECEIVER_ARGS="-server -Durl=$URL \ +-Daddress=$ADDRESS \ +-Dtimeout=$TIMEOUT \ +-Dmsg-count=$MESSAGES \ +-Dack-frequency=$ACK_FREQUENCY \ +-Dtx=$TX \ +-Drollback-frequnecy=$ROLLBACL_FREQUENCY \ +-Dprint-content=$PRINT_CONTENT \ +-Dprint-headers=$PRINT_HEADERS \ +-Dreport-total=$REPORT_TOTAL \ +-Dreport-every=$REPORT_EVERY \ +-Dreport-header=$REPORT_HEADER \ +-Dmax_prefetch=$CAPACITY " + +if [ "x$READY_ADDRESS" != "x" ]; then RECEIVER_ARGS="$RECEIVER_ARGS -Dready-address=$READY_ADDRESS"; fi +if [ "$VERBOSE" == "1" ]; then echo $RECEIVER_ARGS; fi +echo $RECEIVER_ARGS +$JAVA -cp $CLASSPATH $LOG_CONFIG $JAVA_MEM $RECEIVER_ARGS org.apache.qpid.tools.QpidReceive diff --git a/java/tools/bin/qpid-jms-send b/java/tools/bin/qpid-jms-send new file mode 100755 index 0000000000..d7695924f0 --- /dev/null +++ b/java/tools/bin/qpid-jms-send @@ -0,0 +1,261 @@ +#!/bin/sh +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# This starts the controller for coordinating perf tests/ + +. check-qpid-java-env + +PROGRAM_NAME="qpid-jms-send" +URL="amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'" +ADDRESS="queue;{create:always}" +MESSAGES="1" +ID="" +REPLY_TO="" +SEND_EOS="1" +DURABLE="false" +TTL="0" +PRIORITY="0" +PROPERTY="" +CORRELATION_ID="" +USER_ID="" +CONTENT_STRING="" +CONTENT_SIZE="1024" +CONTENT_MAP="" +CAPACITY="1000" +ACK_FREQUENCY="100" +TX="0" +ROLLBACL_FREQUENCY="0" +PRINT_CONTENT="true" +PRINT_HEADERS="false" +REPORT_TOTAL="false" +REPORT_EVERY="0" +REPORT_HEADER="true" +SEND_RATE="-1" +SEQUNCE="1" +DISABLE_TIMESTAMP="false" +EXTRA_JVM_ARGS="" +VERBOSE="0" + +TEST_ID=`echo ${HOSTNAME} | awk -F . '{print $1}'` + +TEMP=$(getopt -n $PROGRAM_NAME -o b:a:m:i:P:M:vh\ + --long broker:,address:,messages:,id:,reply-to:\ +,send-eos:,durable:,ttl:,property:,correlational-id:\ +,user-id:,content-string:,content-size:,content-map:\ +,capacity:,ack-frequency:,tx:,rollback-frequency:\ +,print-content:,print-headers:,report-total\ +,report-every:,report-header:,send-rate:,sequence:,timestamp:\ +,jvm-args:,verbose,help -- "$@") + +# padding the option string with 4 spaces +# padding the desc string with 30 spaces +usage() +{ + printf "\n%s\n" "Usage: $PROGRAM_NAME [option].." + + printf "\n%20s\n%57s\n" "-b, --broker URL" "url of broker to connect to" + + printf "\n%24s\n%53s\n" "-a,--address ADDRESS" "address to receive from" + + printf "\n%24s\n%89s\n" "-m, --messages N (0)" "Number of messages to receive; 0 means receive indefinitely" + + printf "\n%15s\n%75s\n" "-i, --id ID" "use the supplied id instead of generating one" + + printf "\n%23s\n%54s\n" "--reply-to REPLY-TO" "specify reply-to address" + + printf "\n%20s\n%70s\n" "--send-eos N (0)" "send N EOS messages to mark end of input" + + printf "\n%24s\n%54s\n" "--durable yes|no (0)" "mark messages as durable" + + printf "\n%19s\n%72s\n" "--ttl msecs (0)" "time-to-live for messages, in milliseconds" + + printf "\n%27s\n%72s\n" "--priority PRIORITY (0)" "time-to-live for messages, in milliseconds" + + printf "\n%29s\n%54s\n" "-P, --property NAME=VALUE" "specify message property" + + printf "\n%23s\n%57s\n" "--correlation-id ID" "correlation-id for message" + + printf "\n%20s\n%48s\n" "--user-id USERID" "userid for message" + + printf "\n%28s\n%60s\n" "--content-string CONTENT" "use CONTENT as message content" + + printf "\n%24s\n%62s\n" "--content-size N (0)" "create an N-byte message content" + + printf "\n%32s\n%59s\n" "-M, --content-map NAME=VALUE" "specify entry for map content" + + printf "\n%23s\n%71s\n" "--capacity N (1000)" "Pre-fetch window (0 implies no pre-fetch)" + + printf "\n%27s\n%94s\n" "--ack-frequency N (100)" "Ack frequency (0 implies none of the messages will get accepted)" + + printf "\n%14s\n%94s\n" "--tx N (0)" "batch size for transactions (0 implies transaction are not used)" + + printf "\n%30s\n%94s\n" "--rollback-frequency N (0)" "rollback frequency (0 implies no transaction will be rolledback)" + + printf "\n%30s\n%55s\n" "--print-content yes|no (1)" "print out message content" + + printf "\n%30s\n%55s\n" "--print-headers yes|no (0)" "print out message headers" + + printf "\n%18s\n%76s\n" "--report-total" "Report total throughput and latency statistics" + + printf "\n%24s\n%87s\n" "--report-every N (0)" "Report throughput and latency statistics every N messages" + + printf "\n%30s\n%47s\n" "--report-header yes|no (1)" "Headers on report" + + printf "\n%21s\n%64s\n%62s\n" "--send-rate N (0)" "Send at rate of N messages/second." "0 means send as fast as possible" + + printf "\n%25s\n%69s\n%77s\n" "--sequence yes|no (1)" "Add a sequence number messages property" "(required for duplicate/lost message detection)" + + printf "\n%26s\n%64s\n%77s\n" "--timestamp yes|no (1)" "Add a time stamp messages property" "(required for duplicate/lost message detection)" + + printf "\n%14s\n%69s\n" "--jvm-args" "Extra jvm arguments you want to specify" + + printf "\n%17s\n%69s\n\n" "-v, --verbose" "Print debug information for this script" +} + +eval set -- "$TEMP" +while true; do + case $1 in + -b|--broker) + URL="$2"; shift; shift; continue + ;; + -a|--address) + ADDRESS="$2"; shift; shift; continue + ;; + -m|--messages) + MESSAGES="$2"; shift; shift; continue + ;; + -i|--id) + ID="$2"; shift; shift; continue + ;; + --reply-to) + REPLY_TO="$2"; shift; shift; continue + ;; + --send-eos) + SEND_EOS="$2"; shift; shift; continue + ;; + --durable) + if [ "$2" == "1" ]; then DURABLE="true"; else DURABLE="false"; fi; shift; shift; continue + ;; + --ttl) + TTL="$2"; shift; shift; continue + ;; + --priority) + PRIORITY="$2"; shift; shift; continue + ;; + -P|--property) + PROPERTY="$2,$PROPERTY"; shift; shift; continue + ;; + --correlation-id) + CORRELATION_ID="$2"; shift; shift; continue + ;; + --user-id) + USER_ID="$2"; shift; shift; continue + ;; + --content-string) + CONTENT_STRING="$2"; shift; shift; continue + ;; + --content-size) + CONTENT_SIZE="$2"; shift; shift; continue + ;; + -M|--content-map) + CONTENT_MAP="$2,$CONTENT_MAP"; shift; shift; continue + ;; + --capacity) + CAPACITY="$2"; shift; shift; continue + ;; + --ack-frequency) + ACK_FREQUENCY="$2"; shift; shift; continue + ;; + --tx) + TX="$2"; shift; shift; continue + ;; + --rollback-frequency) + ROLLBACK_FREQUENCY="$2"; shift; shift; continue + ;; + --print-content) + if [ "$2" == "yes" ]; then PRINT_CONTENT="true"; else PRINT_CONTENT="false"; fi; shift; shift; continue + ;; + --print-headers) + if [ "$2" == "yes" ]; then PRINT_HEADERS="true"; else PRINT_HEADERS="false"; fi; shift; shift; continue + ;; + --report-total) + REPORT_TOTAL="true"; shift; continue + ;; + --report-every) + REPORT_EVERY="$2"; shift; shift; continue + ;; + --report-header) + if [ "$2" == "yes" ]; then REPORT_HEADER="true"; else REPORT_HEADER="false"; fi; shift; shift; continue + ;; + --send-rate) + SEND_RATE="$2"; shift; shift; continue + ;; + --sequence) + if [ "$2" == "yes" ]; then SEQUENCE="true"; else SEQUENCE="false"; fi; shift; shift; continue + ;; + --timestamp) + if [ "$2" == "yes" ]; then DISABLE_TIMESTAMP="false"; else DISABLE_TIMESTAMP="true"; fi; shift; shift; continue + ;; + -a|--jvm-args) + EXTRA_JVM_ARGS="$2"; shift; shift; continue + ;; + -h|--help) + usage + exit 0 + ;; + -v|--verbose) + VERBOSE="1"; shift; continue + ;; + --) + # no more arguments to parse + break + ;; + *) + # no more arguments to parse + break + ;; + esac +done + +SENDER_ARGS="-server -Durl=$URL \ +-Daddress=$ADDRESS \ +-Dmsg-count=$MESSAGES \ +-Dsend-eos=$SEND_EOS \ +-Ddurable=$DURABLE \ +-Dmsg_size=$CONTENT_SIZE \ +-Dsend-rate=$SEND_RATE \ +-Ddisable-timestamp=$DISABLE_TIMESTAMP \ +-Dttl=$TTL \ +-Dpriority=$PRIORITY \ +-Dtx=$TX \ +-Drollback-frequnecy=$ROLLBACK_FREQUENCY \ +-Dprint-content=$PRINT_CONTENT \ +-Dprint-headers=$PRINT_HEADERS \ +-Dreport-total=$REPORT_TOTAL \ +-Dreport-every=$REPORT_EVERY \ +-Dreport-header=$REPORT_HEADER \ +-Dmax_prefetch=$CAPACITY " + +if [ "x$ID" != "x" ]; then SENDER_ARGS="$SENDER_ARGS -Did=$ID"; fi +if [ "x$USER_ID" != "x" ]; then SENDER_ARGS="$SENDER_ARGS -Duser_id=$USER_ID"; fi +if [ "x$CORRELATION_ID" != "x" ]; then SENDER_ARGS="$SENDER_ARGS -Dcorrelation_id=$CORRELATION_ID"; fi + +if [ "$VERBOSE" == "1" ]; then echo $SENDER_ARGS; fi +$JAVA -cp $CLASSPATH $LOG_CONFIG $JAVA_MEM $SENDER_ARGS org.apache.qpid.tools.QpidSend diff --git a/java/tools/build.xml b/java/tools/build.xml index 7cd1b1172c..99b0375e95 100644 --- a/java/tools/build.xml +++ b/java/tools/build.xml @@ -7,9 +7,9 @@ - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - + - - http://www.apache.org/licenses/LICENSE-2.0 - - + - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,7 +19,6 @@ - --> <project name="Qpid Tools" default="build"> - <property name="module.depends" value="client common"/> <import file="../module.xml"/> diff --git a/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java b/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java index dbc73c404f..de7748acd6 100644 --- a/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java +++ b/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java @@ -1,4 +1,3 @@ -package org.apache.qpid.testkit; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,6 +18,7 @@ package org.apache.qpid.testkit; * under the License. * */ +package org.apache.qpid.testkit; public interface ErrorHandler { diff --git a/java/tools/src/main/java/org/apache/qpid/tools/Clock.java b/java/tools/src/main/java/org/apache/qpid/tools/Clock.java index 4e79dd62a8..7eb83a520b 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/Clock.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/Clock.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.tools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * In the future this will be replaced by a Clock abstraction * that can utilize a realtime clock when running in RT Java. @@ -27,6 +30,8 @@ package org.apache.qpid.tools; public class Clock { + private static final Logger _logger = LoggerFactory.getLogger(Clock.class); + public final static long SEC = 60000; private static Precision precision; @@ -54,7 +59,11 @@ public class Clock precision = Precision.getPrecision(System.getProperty("precision","mili")); //offset = Long.getLong("offset",-1); - System.out.println("Using precision : " + precision + " and offset " + offset); + if (_logger.isDebugEnabled()) + { + System.out.println("Using precision : " + precision ); + //+ " and offset " + offset); + } } public static Precision getPrecision() diff --git a/java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java b/java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java index c6abdf6c84..e0e48519f3 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java @@ -29,383 +29,422 @@ import org.apache.qpid.client.AMQConnection; public class JVMArgConfiguration implements TestConfiguration { - /* - * By default the connection URL is used. - * This allows a user to easily specify a fully fledged URL any given property. - * Ex. SSL parameters - * - * By providing a host & port allows a user to simply override the URL. - * This allows to create multiple clients in test scripts easily, - * without having to deal with the long URL format. - */ - private String url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'"; + /* + * By default the connection URL is used. + * This allows a user to easily specify a fully fledged URL any given property. + * Ex. SSL parameters + * + * By providing a host & port allows a user to simply override the URL. + * This allows to create multiple clients in test scripts easily, + * without having to deal with the long URL format. + */ + private String url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'"; - private String host = ""; + private String host = ""; - private int port = -1; + private int port = -1; - private String address = "queue; {create : always}"; + private String address = "queue; {create : always}"; - private int msg_size = 1024; + private long timeout = 0; - private int random_msg_size_start_from = 1; + private int msg_size = 1024; - private boolean cacheMessage = false; + private int random_msg_size_start_from = 1; - private boolean disableMessageID = false; + private boolean cacheMessage = false; - private boolean disableTimestamp = false; + private boolean disableMessageID = false; - private boolean durable = false; + private boolean disableTimestamp = false; - private int transaction_size = 0; + private boolean durable = false; - private int ack_mode = Session.AUTO_ACKNOWLEDGE; + private int transaction_size = 0; - private int msg_count = 10; + private int ack_mode = Session.AUTO_ACKNOWLEDGE; - private int warmup_count = 1; + private int msg_count = 10; - private boolean random_msg_size = false; + private int warmup_count = 1; - private String msgType = "bytes"; + private boolean random_msg_size = false; - private boolean printStdDev = false; + private String msgType = "bytes"; - private int sendRate = 0; + private boolean printStdDev = false; - private boolean externalController = false; + private int sendRate = 0; - private boolean useUniqueDest = false; // useful when using multiple connections. + private boolean externalController = false; - private int ackFrequency = 100; + private boolean useUniqueDest = false; // useful when using multiple connections. - private DecimalFormat df = new DecimalFormat("###.##"); + private int ackFrequency = 100; - private int reportEvery = 0; + private DecimalFormat df = new DecimalFormat("###.##"); - private boolean isReportTotal = false; - - private boolean isReportHeader = true; + private int reportEvery = 0; - private boolean isReportLatency = false; - - private int sendEOS = 0; - - private int connectionCount = 1; - - private int rollbackFrequency = 0; - - private boolean printHeaders; - - public JVMArgConfiguration() - { - - url = System.getProperty("url",url); - host = System.getProperty("host",""); - port = Integer.getInteger("port", -1); - address = System.getProperty("address",address); - - msg_size = Integer.getInteger("msg-size", 1024); - cacheMessage = Boolean.getBoolean("cache-msg"); - disableMessageID = Boolean.getBoolean("disable-message-id"); - disableTimestamp = Boolean.getBoolean("disable-timestamp"); - durable = Boolean.getBoolean("durable"); - transaction_size = Integer.getInteger("tx",1000); - ack_mode = Integer.getInteger("ack-mode",Session.AUTO_ACKNOWLEDGE); - msg_count = Integer.getInteger("msg-count",msg_count); - warmup_count = Integer.getInteger("warmup-count",warmup_count); - random_msg_size = Boolean.getBoolean("random-msg-size"); - msgType = System.getProperty("msg-type","bytes"); - printStdDev = Boolean.getBoolean("print-std-dev"); - sendRate = Integer.getInteger("rate",0); - externalController = Boolean.getBoolean("ext-controller"); - useUniqueDest = Boolean.getBoolean("use-unique-dest"); - random_msg_size_start_from = Integer.getInteger("random-msg-size-start-from", 1); - reportEvery = Integer.getInteger("report-every"); - isReportTotal = Boolean.getBoolean("report-total"); - isReportHeader = (System.getProperty("report-header") == null) ? true : Boolean.getBoolean("report-header"); - isReportLatency = Boolean.getBoolean("report-latency"); - sendEOS = Integer.getInteger("send-eos"); - connectionCount = Integer.getInteger("con_count",1); - ackFrequency = Integer.getInteger("ack-frequency"); - rollbackFrequency = Integer.getInteger("rollback-frequency"); - printHeaders = Boolean.getBoolean("print-headers"); - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getUrl() - */ - @Override - public String getUrl() - { - return url; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getHost() - */ - @Override - public String getHost() - { - return host; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getPort() - */ - @Override - public int getPort() - { - return port; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getAddress() - */ - @Override - public String getAddress() - { - return address; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getAckMode() - */ - @Override - public int getAckMode() - { - return ack_mode; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getMsgCount() - */ - @Override - public int getMsgCount() - { - return msg_count; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getMsgSize() - */ - @Override - public int getMsgSize() - { - return msg_size; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getRandomMsgSizeStartFrom() - */ - @Override - public int getRandomMsgSizeStartFrom() - { - return random_msg_size_start_from; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#isDurable() - */ - @Override - public boolean isDurable() - { - return durable; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#isTransacted() - */ - @Override - public boolean isTransacted() - { - return transaction_size > 0; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getTransactionSize() - */ - @Override - public int getTransactionSize() - { - return transaction_size; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getWarmupCount() - */ - @Override - public int getWarmupCount() - { - return warmup_count; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#isCacheMessage() - */ - @Override - public boolean isCacheMessage() - { - return cacheMessage; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#isDisableMessageID() - */ - @Override - public boolean isDisableMessageID() - { - return disableMessageID; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#isDisableTimestamp() - */ - @Override - public boolean isDisableTimestamp() - { - return disableTimestamp; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#isRandomMsgSize() - */ - @Override - public boolean isRandomMsgSize() - { - return random_msg_size; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getMessageType() - */ - @Override - public String getMessageType() - { - return msgType; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#isPrintStdDev() - */ - @Override - public boolean isPrintStdDev() - { - return printStdDev; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getSendRate() - */ - @Override - public int getSendRate() - { - return sendRate; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#isExternalController() - */ - @Override - public boolean isExternalController() - { - return externalController; - } - - public void setAddress(String addr) - { - address = addr; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#isUseUniqueDests() - */ - @Override - public boolean isUseUniqueDests() - { - return useUniqueDest; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getAckFrequency() - */ - @Override - public int getAckFrequency() - { - return ackFrequency; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#createConnection() - */ - @Override - public Connection createConnection() throws Exception - { - if (getHost().equals("") || getPort() == -1) - { - return new AMQConnection(getUrl()); - } - else - { - return new AMQConnection(getHost(),getPort(),"guest","guest","test","test"); - } - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getDecimalFormat() - */ - @Override - public DecimalFormat getDecimalFormat() - { - return df; - } - - @Override - public int reportEvery() - { - return reportEvery; - } - - @Override - public boolean isReportTotal() - { - return isReportTotal; - } - - @Override - public boolean isReportHeader() - { - return isReportHeader; - } - - @Override - public boolean isReportLatency() - { - return isReportLatency; - } - - @Override - public int getSendEOS() - { - return sendEOS; - } - - @Override - public int getConnectionCount() - { - return connectionCount; - } - - @Override - public int getRollbackFrequency() - { - return rollbackFrequency; - } - - @Override - public boolean isPrintHeaders() - { - return printHeaders; - } + private boolean isReportTotal = false; + + private boolean isReportHeader = true; + + private int sendEOS = 0; + + private int connectionCount = 1; + + private int rollbackFrequency = 0; + + private boolean printHeaders; + + private boolean printContent; + + private long ttl; + + private int priority; + + private String readyAddress; + + public JVMArgConfiguration() + { + + url = System.getProperty("url",url); + host = System.getProperty("host",""); + port = Integer.getInteger("port", -1); + address = System.getProperty("address",address); + + timeout = Long.getLong("timeout",0); + msg_size = Integer.getInteger("msg-size", 0); + cacheMessage = true; //Boolean.getBoolean("cache-msg"); + disableMessageID = Boolean.getBoolean("disable-message-id"); + disableTimestamp = Boolean.getBoolean("disable-timestamp"); + durable = Boolean.getBoolean("durable"); + transaction_size = Integer.getInteger("tx",1000); + ack_mode = Integer.getInteger("ack-mode",Session.AUTO_ACKNOWLEDGE); + msg_count = Integer.getInteger("msg-count",msg_count); + warmup_count = Integer.getInteger("warmup-count",warmup_count); + random_msg_size = Boolean.getBoolean("random-msg-size"); + msgType = System.getProperty("msg-type","bytes"); + printStdDev = Boolean.getBoolean("print-std-dev"); + sendRate = Integer.getInteger("rate",0); + externalController = Boolean.getBoolean("ext-controller"); + useUniqueDest = Boolean.getBoolean("use-unique-dest"); + random_msg_size_start_from = Integer.getInteger("random-msg-size-start-from", 1); + reportEvery = Integer.getInteger("report-every",0); + isReportTotal = Boolean.getBoolean("report-total"); + isReportHeader = (System.getProperty("report-header") == null) ? true : Boolean.getBoolean("report-header"); + sendEOS = Integer.getInteger("send-eos",1); + connectionCount = Integer.getInteger("con_count",1); + ackFrequency = Integer.getInteger("ack-frequency",100); + rollbackFrequency = Integer.getInteger("rollback-frequency",0); + printHeaders = Boolean.getBoolean("print-headers"); + printContent = Boolean.getBoolean("print-content"); + ttl = Long.getLong("ttl", 0); + priority = Integer.getInteger("priority", 0); + readyAddress = System.getProperty("ready-address"); + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getUrl() + */ + @Override + public String getUrl() + { + return url; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getHost() + */ + @Override + public String getHost() + { + return host; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getPort() + */ + @Override + public int getPort() + { + return port; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getAddress() + */ + @Override + public String getAddress() + { + return address; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getTimeout() + */ + @Override + public long getTimeout() + { + return timeout; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getAckMode() + */ + @Override + public int getAckMode() + { + return ack_mode; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getMsgCount() + */ + @Override + public int getMsgCount() + { + return msg_count; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getMsgSize() + */ + @Override + public int getMsgSize() + { + return msg_size; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getRandomMsgSizeStartFrom() + */ + @Override + public int getRandomMsgSizeStartFrom() + { + return random_msg_size_start_from; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#isDurable() + */ + @Override + public boolean isDurable() + { + return durable; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#isTransacted() + */ + @Override + public boolean isTransacted() + { + return transaction_size > 0; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getTransactionSize() + */ + @Override + public int getTransactionSize() + { + return transaction_size; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getWarmupCount() + */ + @Override + public int getWarmupCount() + { + return warmup_count; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#isCacheMessage() + */ + @Override + public boolean isCacheMessage() + { + return cacheMessage; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#isDisableMessageID() + */ + @Override + public boolean isDisableMessageID() + { + return disableMessageID; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#isDisableTimestamp() + */ + @Override + public boolean isDisableTimestamp() + { + return disableTimestamp; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#isRandomMsgSize() + */ + @Override + public boolean isRandomMsgSize() + { + return random_msg_size; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getMessageType() + */ + @Override + public String getMessageType() + { + return msgType; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#isPrintStdDev() + */ + @Override + public boolean isPrintStdDev() + { + return printStdDev; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getSendRate() + */ + @Override + public int getSendRate() + { + return sendRate; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#isExternalController() + */ + @Override + public boolean isExternalController() + { + return externalController; + } + + public void setAddress(String addr) + { + address = addr; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#isUseUniqueDests() + */ + @Override + public boolean isUseUniqueDests() + { + return useUniqueDest; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getAckFrequency() + */ + @Override + public int getAckFrequency() + { + return ackFrequency; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#createConnection() + */ + @Override + public Connection createConnection() throws Exception + { + if (getHost().equals("") || getPort() == -1) + { + return new AMQConnection(getUrl()); + } + else + { + return new AMQConnection(getHost(),getPort(),"guest","guest","test","test"); + } + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getDecimalFormat() + */ + @Override + public DecimalFormat getDecimalFormat() + { + return df; + } + + @Override + public int reportEvery() + { + return reportEvery; + } + + @Override + public boolean isReportTotal() + { + return isReportTotal; + } + + @Override + public boolean isReportHeader() + { + return isReportHeader; + } + + @Override + public int getSendEOS() + { + return sendEOS; + } + + @Override + public int getConnectionCount() + { + return connectionCount; + } + + @Override + public int getRollbackFrequency() + { + return rollbackFrequency; + } + + @Override + public boolean isPrintHeaders() + { + return printHeaders; + } + + @Override + public boolean isPrintContent() + { + return printContent; + } + + @Override + public long getTTL() + { + return ttl; + } + + @Override + public int getPriority() + { + return priority; + } + + @Override + public String getReadyAddress() + { + return readyAddress; + } } diff --git a/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java b/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java index 8ab1379fce..a0ba928e1f 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java @@ -1,4 +1,3 @@ -package org.apache.qpid.tools; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,6 +18,7 @@ package org.apache.qpid.tools; * under the License. * */ +package org.apache.qpid.tools; import javax.jms.BytesMessage; diff --git a/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java b/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java index 02f011f1b9..6dd8b7e1ca 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java @@ -28,10 +28,12 @@ import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; +import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.tools.TestConfiguration.MessageType; import org.apache.qpid.tools.report.BasicReporter; import org.apache.qpid.tools.report.Reporter; @@ -42,140 +44,164 @@ import org.slf4j.LoggerFactory; public class QpidReceive implements MessageListener { - private static final Logger _logger = LoggerFactory.getLogger(QpidSend.class); - private final CountDownLatch testCompleted = new CountDownLatch(1); - - private Connection con; - private Session session; - private Destination dest; - private MessageConsumer consumer; - private boolean transacted = false; - private boolean isRollback = false; - private int txSize = 0; - private int rollbackFrequency = 0; - private int ackFrequency = 0; - private int expected = 0; - private int received = 0; - private Reporter report; - private TestConfiguration config; - - public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest) - { - this(report,config, con, dest, UUID.randomUUID().toString()); - } - - public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest, String prefix) - { - //System.out.println("Producer ID : " + id); - this.report = report; - this.config = config; - this.con = con; - this.dest = dest; - } - - public void setUp() throws Exception - { - if (config.isTransacted()) - { - session = con.createSession(true, Session.SESSION_TRANSACTED); - } - else if (config.getAckFrequency() > 0) - { - session = con.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); - } - else - { - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - } - consumer = session.createConsumer(dest); - consumer.setMessageListener(this); - System.out.println("Consumer: " + /*id +*/ " Receiving messages from : " + ((AMQDestination)dest).getAddressName() + "\n"); - - transacted = config.isTransacted(); - txSize = config.getTransactionSize(); - isRollback = config.getRollbackFrequency() > 0; - rollbackFrequency = config.getRollbackFrequency(); - ackFrequency = config.getAckFrequency(); - } - - public void resetCounters() - { - received = 0; - expected = 0; - report.clear(); - } - - public void onMessage(Message msg) - { - try - { - if (msg instanceof TextMessage && - TestConfiguration.EOS.equals(((TextMessage)msg).getText())) - { - testCompleted.countDown(); - return; - } - - received++; - report.message(msg); - - if (transacted && (received % txSize == 0)) - { - if (isRollback && (received % rollbackFrequency == 0)) - { - session.rollback(); - } - else - { - session.commit(); - } - } - else if (ackFrequency > 0) - { - msg.acknowledge(); - } - - if (expected >= received) - { - testCompleted.countDown(); - } - - } - catch(Exception e) - { - _logger.error("Error when receiving messages",e); - } - - } - - public void waitforCompletion(int expected) throws Exception - { - this.expected = expected; - testCompleted.await(); - } - - public void tearDown() throws Exception - { - session.close(); - } - - public static void main(String[] args) throws Exception - { - TestConfiguration config = new JVMArgConfiguration(); - Reporter reporter = new BasicReporter(config.isReportLatency()? ThroughputAndLatency.class : Throughput.class, - System.out, - config.reportEvery(), - config.isReportHeader() - ); - Destination dest = AMQDestination.createDestination(config.getAddress()); - QpidReceive receiver = new QpidReceive(reporter,config, config.createConnection(),dest); - receiver.setUp(); - receiver.waitforCompletion(config.getMsgCount()); - if (config.isReportTotal()) - { - reporter.report(); - } - receiver.tearDown(); - } + private static final Logger _logger = LoggerFactory.getLogger(QpidReceive.class); + private final CountDownLatch testCompleted = new CountDownLatch(1); + + private Connection con; + private Session session; + private Destination dest; + private MessageConsumer consumer; + private boolean transacted = false; + private boolean isRollback = false; + private int txSize = 0; + private int rollbackFrequency = 0; + private int ackFrequency = 0; + private int expected = 0; + private int received = 0; + private Reporter report; + private TestConfiguration config; + + public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest) + { + this(report,config, con, dest, UUID.randomUUID().toString()); + } + + public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest, String prefix) + { + //System.out.println("Producer ID : " + id); + this.report = report; + this.config = config; + this.con = con; + this.dest = dest; + } + + public void setUp() throws Exception + { + con.start(); + if (config.isTransacted()) + { + session = con.createSession(true, Session.SESSION_TRANSACTED); + } + else if (config.getAckFrequency() > 0) + { + session = con.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); + } + else + { + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + consumer = session.createConsumer(dest); + consumer.setMessageListener(this); + if (_logger.isDebugEnabled()) + { + System.out.println("Consumer: " + /*id +*/ " Receiving messages from : " + ((AMQDestination)dest).getAddressName() + "\n"); + } + + transacted = config.isTransacted(); + txSize = config.getTransactionSize(); + isRollback = config.getRollbackFrequency() > 0; + rollbackFrequency = config.getRollbackFrequency(); + ackFrequency = config.getAckFrequency(); + + _logger.debug("Ready address : " + config.getReadyAddress()); + if (config.getReadyAddress() != null) + { + MessageProducer prod = session.createProducer(AMQDestination + .createDestination(config.getReadyAddress())); + prod.send(session.createMessage()); + if (_logger.isDebugEnabled()) + { + _logger.debug("Sending message to ready address " + prod.getDestination()); + } + } + } + + public void resetCounters() + { + received = 0; + expected = 0; + report.clear(); + } + + public void onMessage(Message msg) + { + try + { + if (msg instanceof TextMessage && + TestConfiguration.EOS.equals(((TextMessage)msg).getText())) + { + testCompleted.countDown(); + return; + } + + received++; + report.message(msg); + + if (config.isPrintHeaders()) + { + System.out.println(((AbstractJMSMessage)msg).toHeaderString()); + } + + if (config.isPrintContent()) + { + System.out.println(((AbstractJMSMessage)msg).toBodyString()); + } + + if (transacted && (received % txSize == 0)) + { + if (isRollback && (received % rollbackFrequency == 0)) + { + session.rollback(); + } + else + { + session.commit(); + } + } + else if (ackFrequency > 0) + { + msg.acknowledge(); + } + + if (received >= expected) + { + testCompleted.countDown(); + } + + } + catch(Exception e) + { + _logger.error("Error when receiving messages",e); + } + } + + public void waitforCompletion(int expected) throws Exception + { + this.expected = expected; + testCompleted.await(); + } + + public void tearDown() throws Exception + { + session.close(); + } + + public static void main(String[] args) throws Exception + { + TestConfiguration config = new JVMArgConfiguration(); + Reporter reporter = new BasicReporter(ThroughputAndLatency.class, + System.out, + config.reportEvery(), + config.isReportHeader()); + Destination dest = AMQDestination.createDestination(config.getAddress()); + QpidReceive receiver = new QpidReceive(reporter,config, config.createConnection(),dest); + receiver.setUp(); + receiver.waitforCompletion(config.getMsgCount() + config.getSendEOS()); + if (config.isReportTotal()) + { + reporter.report(); + } + receiver.tearDown(); + } } diff --git a/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java b/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java index c058b83d41..3d321dcade 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java @@ -43,249 +43,261 @@ import org.slf4j.LoggerFactory; public class QpidSend { - private Connection con; - private Session session; - private Destination dest; - private MessageProducer producer; - private MessageType msgType; - private Message msg; - private Object payload; - private List<Object> payloads; - private boolean cacheMsg = false; - private boolean randomMsgSize = false; - private boolean durable = false; - private Random random; - private int msgSizeRange = 1024; - private int totalMsgCount = 0; - private boolean rateLimitProducer = false; - private boolean transacted = false; - private int txSize = 0; + private Connection con; + private Session session; + private Destination dest; + private MessageProducer producer; + private MessageType msgType; + private Message msg; + private Object payload; + private List<Object> payloads; + private boolean cacheMsg = false; + private boolean randomMsgSize = false; + private boolean durable = false; + private Random random; + private int msgSizeRange = 1024; + private int totalMsgCount = 0; + private boolean rateLimitProducer = false; + private boolean transacted = false; + private int txSize = 0; - private static final Logger _logger = LoggerFactory.getLogger(QpidSend.class); - Reporter report; - TestConfiguration config; + private static final Logger _logger = LoggerFactory.getLogger(QpidSend.class); + Reporter report; + TestConfiguration config; - public QpidSend(Reporter report, TestConfiguration config, Connection con, Destination dest) - { - this(report,config, con, dest, UUID.randomUUID().toString()); - } + public QpidSend(Reporter report, TestConfiguration config, Connection con, Destination dest) + { + this(report,config, con, dest, UUID.randomUUID().toString()); + } - public QpidSend(Reporter report, TestConfiguration config, Connection con, Destination dest, String prefix) - { - //System.out.println("Producer ID : " + id); - this.report = report; - this.config = config; - this.con = con; - this.dest = dest; - } + public QpidSend(Reporter report, TestConfiguration config, Connection con, Destination dest, String prefix) + { + //System.out.println("Producer ID : " + id); + this.report = report; + this.config = config; + this.con = con; + this.dest = dest; + } - public void setUp() throws Exception - { - if (config.isTransacted()) - { - session = con.createSession(true, Session.SESSION_TRANSACTED); - } - else - { - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - } + public void setUp() throws Exception + { + con.start(); + if (config.isTransacted()) + { + session = con.createSession(true, Session.SESSION_TRANSACTED); + } + else + { + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + } - durable = config.isDurable(); - rateLimitProducer = config.getSendRate() > 0 ? true : false; - if (_logger.isDebugEnabled() && rateLimitProducer) - { - System.out.println("The test will attempt to limit the producer to " + config.getSendRate() + " msg/sec"); - } + durable = config.isDurable(); + rateLimitProducer = config.getSendRate() > 0 ? true : false; + if (_logger.isDebugEnabled() && rateLimitProducer) + { + _logger.debug("The test will attempt to limit the producer to " + config.getSendRate() + " msg/sec"); + } - transacted = config.isTransacted(); - txSize = config.getTransactionSize(); + transacted = config.isTransacted(); + txSize = config.getTransactionSize(); - msgType = MessageType.getType(config.getMessageType()); - // if message caching is enabled we pre create the message - // else we pre create the payload - if (config.isCacheMessage()) - { - cacheMsg = true; - msg = createMessage(createPayload(config.getMsgSize())); - msg.setJMSDeliveryMode(durable? - DeliveryMode.PERSISTENT : - DeliveryMode.NON_PERSISTENT - ); - } - else if (config.isRandomMsgSize()) - { - random = new Random(20080921); - randomMsgSize = true; - msgSizeRange = config.getMsgSize(); - payloads = new ArrayList<Object>(msgSizeRange); + msgType = MessageType.getType(config.getMessageType()); + // if message caching is enabled we pre create the message + // else we pre create the payload + if (config.isCacheMessage()) + { + cacheMsg = true; + msg = createMessage(createPayload(config.getMsgSize())); + msg.setJMSDeliveryMode(durable? + DeliveryMode.PERSISTENT : + DeliveryMode.NON_PERSISTENT + ); + } + else if (config.isRandomMsgSize()) + { + random = new Random(20080921); + randomMsgSize = true; + msgSizeRange = config.getMsgSize(); + payloads = new ArrayList<Object>(msgSizeRange); - for (int i=0; i < msgSizeRange; i++) - { - payloads.add(createPayload(i)); - } - } - else - { - payload = createPayload(config.getMsgSize()); - } + for (int i=0; i < msgSizeRange; i++) + { + payloads.add(createPayload(i)); + } + } + else + { + payload = createPayload(config.getMsgSize()); + } - producer = session.createProducer(dest); - if (_logger.isDebugEnabled()) - { - System.out.println("Producer: " + /*id +*/ " Sending messages to: " + ((AMQDestination)dest).getAddressName()); - } - producer.setDisableMessageID(config.isDisableMessageID()); - producer.setDisableMessageTimestamp(config.isDisableTimestamp()); - } + producer = session.createProducer(dest); + if (_logger.isDebugEnabled()) + { + _logger.debug("Producer: " + /*id +*/ " Sending messages to: " + ((AMQDestination)dest).getAddressName()); + } + producer.setDisableMessageID(config.isDisableMessageID()); + //we add a separate timestamp to allow interoperability with other clients. + producer.setDisableMessageTimestamp(true); + if (config.getTTL() > 0) + { + producer.setTimeToLive(config.getTTL()); + } + if (config.getPriority() > 0) + { + producer.setPriority(config.getPriority()); + } + } - Object createPayload(int size) - { - if (msgType == MessageType.TEXT) - { - return MessageFactory.createMessagePayload(size); - } - else - { - return MessageFactory.createMessagePayload(size).getBytes(); - } - } + Object createPayload(int size) + { + if (msgType == MessageType.TEXT) + { + return MessageFactory.createMessagePayload(size); + } + else + { + return MessageFactory.createMessagePayload(size).getBytes(); + } + } - Message createMessage(Object payload) throws Exception - { - if (msgType == MessageType.TEXT) - { - return session.createTextMessage((String)payload); - } - else - { - BytesMessage m = session.createBytesMessage(); - m.writeBytes((byte[])payload); - return m; - } - } + Message createMessage(Object payload) throws Exception + { + if (msgType == MessageType.TEXT) + { + return session.createTextMessage((String)payload); + } + else + { + BytesMessage m = session.createBytesMessage(); + m.writeBytes((byte[])payload); + return m; + } + } - protected Message getNextMessage() throws Exception - { - if (cacheMsg) - { - return msg; - } - else - { - Message m; + protected Message getNextMessage() throws Exception + { + if (cacheMsg) + { + return msg; + } + else + { + Message m; - if (!randomMsgSize) - { - m = createMessage(payload); - } - else - { - m = createMessage(payloads.get(random.nextInt(msgSizeRange))); - } - m.setJMSDeliveryMode(durable? - DeliveryMode.PERSISTENT : - DeliveryMode.NON_PERSISTENT - ); - return m; - } - } + if (!randomMsgSize) + { + m = createMessage(payload); + } + else + { + m = createMessage(payloads.get(random.nextInt(msgSizeRange))); + } + m.setJMSDeliveryMode(durable? + DeliveryMode.PERSISTENT : + DeliveryMode.NON_PERSISTENT + ); + return m; + } + } - public void commit() throws Exception - { - session.commit(); - } + public void commit() throws Exception + { + session.commit(); + } - public void send() throws Exception - { - send(config.getMsgCount()); - } + public void send() throws Exception + { + send(config.getMsgCount()); + } - public void send(int count) throws Exception - { - int sendRate = config.getSendRate(); - if (rateLimitProducer) - { - int iterations = count/sendRate; - int remainder = count%sendRate; - for (int i=0; i < iterations; i++) - { - long iterationStart = Clock.getTime(); - sendMessages(sendRate); - long elapsed = (Clock.getTime() - iterationStart)*Clock.convertToMiliSecs(); - long diff = Clock.SEC - elapsed; - if (diff > 0) - { - // We have sent more messages in a sec than specified by the rate. - Thread.sleep(diff); - } - } - sendMessages(remainder); - } - else - { - sendMessages(count); - } - } + public void send(int count) throws Exception + { + int sendRate = config.getSendRate(); + if (rateLimitProducer) + { + int iterations = count/sendRate; + int remainder = count%sendRate; + for (int i=0; i < iterations; i++) + { + long iterationStart = System.currentTimeMillis(); + sendMessages(sendRate); + long elapsed = System.currentTimeMillis() - iterationStart; + long diff = Clock.SEC - elapsed; + if (diff > 0) + { + // We have sent more messages in a sec than specified by the rate. + Thread.sleep(diff); + } + } + sendMessages(remainder); + } + else + { + sendMessages(count); + } + } - private void sendMessages(int count) throws Exception - { - boolean isTimestamp = config.isReportLatency(); - for(int i=0; i < count; i++ ) - { - Message msg = getNextMessage(); - if (isTimestamp) - { - msg.setLongProperty(TestConfiguration.TIMESTAMP, Clock.getTime()); - } - producer.send(msg); - report.message(msg); - totalMsgCount++; + private void sendMessages(int count) throws Exception + { + boolean isTimestamp = !config.isDisableTimestamp(); + long s = System.currentTimeMillis(); + for(int i=0; i < count; i++ ) + { + Message msg = getNextMessage(); + if (isTimestamp) + { + msg.setLongProperty(TestConfiguration.TIMESTAMP, System.currentTimeMillis()); + } + producer.send(msg); + //report.message(msg); + totalMsgCount++; - if ( transacted && ((totalMsgCount) % txSize == 0)) - { - session.commit(); - } - } - } + if ( transacted && ((totalMsgCount) % txSize == 0)) + { + session.commit(); + } + } + long e = System.currentTimeMillis() - s; + //System.out.println("Rate : " + totalMsgCount/e); + } - public void resetCounters() - { - totalMsgCount = 0; - report.clear(); - } + public void resetCounters() + { + totalMsgCount = 0; + report.clear(); + } - public void sendEndMessage() throws Exception - { - Message msg = session.createMessage(); - msg.setBooleanProperty(TestConfiguration.EOS, true); - producer.send(msg); - } - - public void tearDown() throws Exception - { - session.close(); - } + public void sendEndMessage() throws Exception + { + Message msg = session.createTextMessage(TestConfiguration.EOS); + producer.send(msg); + } - public static void main(String[] args) throws Exception - { - TestConfiguration config = new JVMArgConfiguration(); - Reporter reporter = new BasicReporter(Throughput.class, - System.out, - config.reportEvery(), - config.isReportHeader() - ); - Destination dest = AMQDestination.createDestination(config.getAddress()); - QpidSend sender = new QpidSend(reporter,config, config.createConnection(),dest); - sender.setUp(); - sender.send(); - if (config.getSendEOS() > 0) - { - sender.sendEndMessage(); - } - if (config.isReportTotal()) - { - reporter.report(); - } - sender.tearDown(); - } + public void tearDown() throws Exception + { + session.close(); + } + + public static void main(String[] args) throws Exception + { + TestConfiguration config = new JVMArgConfiguration(); + Reporter reporter = new BasicReporter(Throughput.class, + System.out, + config.reportEvery(), + config.isReportHeader() + ); + Destination dest = AMQDestination.createDestination(config.getAddress()); + QpidSend sender = new QpidSend(reporter,config, config.createConnection(),dest); + sender.setUp(); + sender.send(); + if (config.getSendEOS() > 0) + { + sender.sendEndMessage(); + } + if (config.isReportTotal()) + { + reporter.report(); + } + sender.tearDown(); + } } diff --git a/java/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java b/java/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java index 7f7df0e5e6..18870bac59 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java @@ -26,20 +26,20 @@ import javax.jms.Connection; public interface TestConfiguration { - enum MessageType { - BYTES, TEXT, MAP, OBJECT; - - public static MessageType getType(String s) throws Exception - { - if ("text".equalsIgnoreCase(s)) - { - return TEXT; - } - else if ("bytes".equalsIgnoreCase(s)) - { - return BYTES; - } - /*else if ("map".equalsIgnoreCase(s)) + enum MessageType { + BYTES, TEXT, MAP, OBJECT; + + public static MessageType getType(String s) throws Exception + { + if ("text".equalsIgnoreCase(s)) + { + return TEXT; + } + else if ("bytes".equalsIgnoreCase(s)) + { + return BYTES; + } + /*else if ("map".equalsIgnoreCase(s)) { return MAP; } @@ -47,80 +47,88 @@ public interface TestConfiguration { return OBJECT; }*/ - else - { - throw new Exception("Unsupported message type"); - } - } - }; + else + { + throw new Exception("Unsupported message type"); + } + } + }; + + public final static String TIMESTAMP = "ts"; + + public final static String EOS = "eos"; + + public final static String SEQUENCE_NUMBER = "sn"; + + public String getUrl(); - public final static String TIMESTAMP = "ts"; + public String getHost(); - public final static String EOS = "eos"; + public int getPort(); - public final static String SEQUENCE_NUMBER = "sn"; + public String getAddress(); - public String getUrl(); + public long getTimeout(); - public String getHost(); + public int getAckMode(); - public int getPort(); + public int getMsgCount(); - public String getAddress(); + public int getMsgSize(); - public int getAckMode(); + public int getRandomMsgSizeStartFrom(); - public int getMsgCount(); + public boolean isDurable(); - public int getMsgSize(); + public boolean isTransacted(); - public int getRandomMsgSizeStartFrom(); + public int getTransactionSize(); - public boolean isDurable(); + public int getWarmupCount(); - public boolean isTransacted(); + public boolean isCacheMessage(); - public int getTransactionSize(); + public boolean isDisableMessageID(); - public int getWarmupCount(); + public boolean isDisableTimestamp(); - public boolean isCacheMessage(); + public boolean isRandomMsgSize(); - public boolean isDisableMessageID(); + public String getMessageType(); - public boolean isDisableTimestamp(); + public boolean isPrintStdDev(); - public boolean isRandomMsgSize(); + public int getSendRate(); - public String getMessageType(); + public boolean isExternalController(); - public boolean isPrintStdDev(); + public boolean isUseUniqueDests(); - public int getSendRate(); + public int getAckFrequency(); - public boolean isExternalController(); + public Connection createConnection() throws Exception; - public boolean isUseUniqueDests(); + public DecimalFormat getDecimalFormat(); - public int getAckFrequency(); + public int reportEvery(); - public Connection createConnection() throws Exception; + public boolean isReportTotal(); - public DecimalFormat getDecimalFormat(); + public boolean isReportHeader(); - public int reportEvery(); + public int getSendEOS(); - public boolean isReportTotal(); + public int getConnectionCount(); - public boolean isReportHeader(); + public int getRollbackFrequency(); - public boolean isReportLatency(); + public boolean isPrintHeaders(); - public int getSendEOS(); + public boolean isPrintContent(); - public int getConnectionCount(); + public long getTTL(); - public int getRollbackFrequency(); + public int getPriority(); - public boolean isPrintHeaders(); + public String getReadyAddress(); }
\ No newline at end of file diff --git a/java/tools/src/main/java/org/apache/qpid/tools/report/Statistics.java b/java/tools/src/main/java/org/apache/qpid/tools/report/Statistics.java index 73efd1f1e0..db8b4ddcee 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/report/Statistics.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/report/Statistics.java @@ -25,115 +25,121 @@ import java.text.DecimalFormat; import javax.jms.Message; +import org.apache.qpid.tools.TestConfiguration; + public interface Statistics { - public void message(Message msg); - public void report(PrintStream out); - public void header(PrintStream out); - public void clear(); - - static class Throughput implements Statistics - { - DecimalFormat df = new DecimalFormat("###.##"); - int messages = 0; - long start = 0; - boolean started = false; - - @Override - public void message(Message msg) - { - ++messages; - if (!started) - { - start = System.currentTimeMillis(); - started = true; - } - } - - @Override - public void report(PrintStream out) - { - long elapsed = System.currentTimeMillis() - start; - out.print(df.format((double)messages/(double)elapsed)); - } - - @Override - public void header(PrintStream out) - { - out.print("tp(m/s)"); - } - - public void clear() - { - messages = 0; - start = 0; - started = false; - } - } - - static class ThroughputAndLatency extends Throughput - { - long minLatency = Long.MAX_VALUE; - long maxLatency = Long.MIN_VALUE; - double totalLatency = 0; - int sampleCount = 0; - - @Override - public void message(Message msg) - { - super.message(msg); - try - { - long ts = msg.getLongProperty("ts"); - long latency = System.currentTimeMillis() - ts; - minLatency = Math.min(latency, minLatency); - maxLatency = Math.min(latency, maxLatency); - totalLatency = totalLatency + latency; - sampleCount++; - } - catch(Exception e) - { - System.out.println("Error calculating latency"); - } - } - - @Override - public void report(PrintStream out) - { - super.report(out); - double avgLatency = totalLatency/(double)sampleCount; - out.append('\t') - .append(String.valueOf(minLatency)) - .append('\t') - .append(String.valueOf(maxLatency)) - .append('\t') - .append(df.format(avgLatency)); - - out.flush(); - } - - @Override - public void header(PrintStream out) - { - super.header(out); - out.append('\t') - .append("l-min") - .append('\t') - .append("l-max") - .append('\t') - .append("l-avg"); - - out.flush(); - } - - public void clear() - { - super.clear(); - minLatency = 0; - maxLatency = 0; - totalLatency = 0; - sampleCount = 0; - } - } + public void message(Message msg); + public void report(PrintStream out); + public void header(PrintStream out); + public void clear(); + + static class Throughput implements Statistics + { + DecimalFormat df = new DecimalFormat("###"); + int messages = 0; + long start = 0; + boolean started = false; + + @Override + public void message(Message msg) + { + ++messages; + if (!started) + { + start = System.currentTimeMillis(); + started = true; + } + } + + @Override + public void report(PrintStream out) + { + long elapsed = System.currentTimeMillis() - start; + out.println(df.format((double)messages/(double)elapsed)); + } + + @Override + public void header(PrintStream out) + { + out.println("tp(m/s)"); + } + + public void clear() + { + messages = 0; + start = 0; + started = false; + } + } + + static class ThroughputAndLatency extends Throughput + { + long minLatency = Long.MAX_VALUE; + long maxLatency = Long.MIN_VALUE; + double totalLatency = 0; + int sampleCount = 0; + + @Override + public void message(Message msg) + { + super.message(msg); + try + { + long ts = msg.getLongProperty(TestConfiguration.TIMESTAMP); + long latency = System.currentTimeMillis() - ts; + minLatency = Math.min(latency, minLatency); + maxLatency = Math.max(latency, maxLatency); + totalLatency = totalLatency + latency; + sampleCount++; + } + catch(Exception e) + { + System.out.println("Error calculating latency " + e); + } + } + + @Override + public void report(PrintStream out) + { + long elapsed = System.currentTimeMillis() - start; + double rate = (double)messages/(double)elapsed; + double avgLatency = totalLatency/(double)sampleCount; + out.append("\n") + .append(df.format(rate)) + .append('\t') + .append(String.valueOf(minLatency)) + .append('\t') + .append(String.valueOf(maxLatency)) + .append('\t') + .append(df.format(avgLatency)) + .append("\n"); + + out.flush(); + } + + @Override + public void header(PrintStream out) + { + out.append("tp(m/s)") + .append('\t') + .append("l-min") + .append('\t') + .append("l-max") + .append('\t') + .append("l-avg"); + + out.flush(); + } + + public void clear() + { + super.clear(); + minLatency = 0; + maxLatency = 0; + totalLatency = 0; + sampleCount = 0; + } + } } |