summaryrefslogtreecommitdiff
path: root/java/tools
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-08-03 12:13:32 +0000
committerKim van der Riet <kpvdr@apache.org>2012-08-03 12:13:32 +0000
commitd43d1912b376322e27fdcda551a73f9ff5487972 (patch)
treece493e10baa95f44be8beb5778ce51783463196d /java/tools
parent04877fec0c6346edec67072d7f2d247740cf2af5 (diff)
downloadqpid-python-d43d1912b376322e27fdcda551a73f9ff5487972.tar.gz
QPID-3858: Updated branch - merged from trunk r.1368650
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368910 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/tools')
-rwxr-xr-xjava/tools/bin/Profile-run-from-source2
-rwxr-xr-x[-rw-r--r--]java/tools/bin/mercury-controller0
-rwxr-xr-x[-rw-r--r--]java/tools/bin/mercury-start-consumers0
-rwxr-xr-x[-rw-r--r--]java/tools/bin/mercury-start-producers0
-rwxr-xr-xjava/tools/bin/qpid-jms-benchmark316
-rwxr-xr-xjava/tools/bin/qpid-jms-receive193
-rwxr-xr-xjava/tools/bin/qpid-jms-send261
-rw-r--r--java/tools/build.xml5
-rw-r--r--java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java2
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/Clock.java11
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java749
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java2
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java296
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java460
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java118
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/report/Statistics.java222
16 files changed, 1753 insertions, 884 deletions
diff --git a/java/tools/bin/Profile-run-from-source b/java/tools/bin/Profile-run-from-source
index f8ec45ccff..179c365450 100755
--- a/java/tools/bin/Profile-run-from-source
+++ b/java/tools/bin/Profile-run-from-source
@@ -49,7 +49,7 @@ export LOG_CONFIG="-Dlog4j.configuration=file:///$QPID_CHECKOUT/java/tools/etc/t
#------------- Required for qpid-python-testkit -----------------------------------------
-PYTHONPATH=$QPID_CHECKOUT/python/qpid:$QPID_CHECKOUT/cpp/src/test/brokertest.py:$PYTHONPATH
+PYTHONPATH=$QPID_CHECKOUT/python:$QPID_CHECKOUT/cpp/src/test/brokertest.py:$PYTHONPATH
export PATH=$QPID_CHECKOUT/python:$PATH
if [ -x $QPID_CHECKOUT/cpp/src/qpidd ]; then
diff --git a/java/tools/bin/mercury-controller b/java/tools/bin/mercury-controller
index fab8614039..fab8614039 100644..100755
--- a/java/tools/bin/mercury-controller
+++ b/java/tools/bin/mercury-controller
diff --git a/java/tools/bin/mercury-start-consumers b/java/tools/bin/mercury-start-consumers
index c71fc0c21f..c71fc0c21f 100644..100755
--- a/java/tools/bin/mercury-start-consumers
+++ b/java/tools/bin/mercury-start-consumers
diff --git a/java/tools/bin/mercury-start-producers b/java/tools/bin/mercury-start-producers
index 7ba0286f7c..7ba0286f7c 100644..100755
--- a/java/tools/bin/mercury-start-producers
+++ b/java/tools/bin/mercury-start-producers
diff --git a/java/tools/bin/qpid-jms-benchmark b/java/tools/bin/qpid-jms-benchmark
new file mode 100755
index 0000000000..3d712a27dc
--- /dev/null
+++ b/java/tools/bin/qpid-jms-benchmark
@@ -0,0 +1,316 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse, time, qpid.messaging, re
+from threading import Thread
+from subprocess import Popen, PIPE, STDOUT
+
+op = optparse.OptionParser(usage="usage: %prog [options]",
+ description="simple performance benchmarks")
+op.add_option("-b", "--broker", default=[], action="append", type="str",
+ help="url of broker(s) to connect to, round robin on multiple brokers")
+op.add_option("-c", "--client-host", default=[], action="append", type="str",
+ help="host(s) to run clients on via ssh, round robin on mulple hosts")
+op.add_option("-q", "--queues", default=1, type="int", metavar="N",
+ help="create N queues (default %default)")
+op.add_option("-s", "--senders", default=1, type="int", metavar="N",
+ help="start N senders per queue (default %default)")
+op.add_option("-r", "--receivers", default=1, type="int", metavar="N",
+ help="start N receivers per queue (default %default)")
+op.add_option("-m", "--messages", default=100000, type="int", metavar="N",
+ help="send N messages per sender (default %default)")
+op.add_option("--queue-name", default="benchmark", metavar="NAME",
+ help="base name for queues (default %default)")
+op.add_option("--send-rate", default=0, metavar="N",
+ help="send rate limited to N messages/second, 0 means no limit (default %default)")
+op.add_option("--receive-rate", default=0, metavar="N",
+ help="receive rate limited to N messages/second, 0 means no limit (default %default)")
+op.add_option("--content-size", default=1024, type="int", metavar="BYTES",
+ help="message size in bytes (default %default)")
+op.add_option("--ack-frequency", default=100, metavar="N", type="int",
+ help="receiver ack's every N messages, 0 means unconfirmed (default %default)")
+op.add_option("--no-report-header", dest="report_header", default=True,
+ action="store_false", help="don't print header on report")
+op.add_option("--summarize", default=False, action="store_true",
+ help="print summary statistics for multiple senders/receivers: total throughput, average latency")
+op.add_option("--repeat", default=1, metavar="N", help="repeat N times", type="int")
+op.add_option("--send-option", default=[], action="append", type="str",
+ help="Additional option for sending addresses")
+op.add_option("--receive-option", default=[], action="append", type="str",
+ help="Additional option for receiving addresses")
+op.add_option("--create-option", default=[], action="append", type="str",
+ help="Additional option for creating addresses")
+op.add_option("--send-arg", default=[], action="append", type="str",
+ help="Additional argument for qpid-send")
+op.add_option("--receive-arg", default=[], action="append", type="str",
+ help="Additional argument for qpid-receive")
+op.add_option("--no-timestamp", dest="timestamp", default=True,
+ action="store_false", help="don't add a timestamp, no latency results")
+op.add_option("--sequence", dest="sequence", default=False,
+ action="store_true", help="add a sequence number to each message")
+op.add_option("--connection-options", type="str",
+ help="Connection options for senders & receivers")
+op.add_option("--durable", default=False, action="store_true",
+ help="Use durable queues and messages")
+op.add_option("--save-received", default=False, action="store_true",
+ help="Save received message content to files <queuename>-receiver-<n>.msg")
+op.add_option("--verbose", default=False, action="store_true",
+ help="Show commands executed")
+op.add_option("--no-delete", default=False, action="store_true",
+ help="Don't delete the test queues.")
+op.add_option("--fill-drain", default=False, action="store_true",
+ help="First fill the queues, then drain them")
+
+single_quote_re = re.compile("'")
+def posix_quote(string):
+ """ Quote a string for use as an argument in a posix shell"""
+ return "'" + single_quote_re.sub("\\'", string) + "'";
+
+def ssh_command(host, command):
+ """ Convert command into an ssh command on host with quoting"""
+ return ["ssh", host] + [posix_quote(arg) for arg in command]
+
+class Clients:
+ def __init__(self): self.clients=[]
+
+ def add(self, client):
+ self.clients.append(client)
+ return client
+
+ def kill(self):
+ for c in self.clients:
+ try: c.kill()
+ except: pass
+
+class PopenCommand(Popen):
+ """Like Popen but you can query for the command"""
+ def __init__(self, command, *args, **kwargs):
+ self.command = command
+ Popen.__init__(self, command, *args, **kwargs)
+
+clients = Clients()
+
+def start_receive(queue, index, opts, ready_queue, broker, host):
+ address_opts=opts.receive_option
+ if opts.durable: address_opts += ["node:{durable:true}"]
+ address="%s;{%s}"%(queue,",".join(address_opts))
+ msg_total=opts.senders*opts.messages
+ messages = msg_total/opts.receivers;
+ if (index < msg_total%opts.receivers): messages += 1
+ if (messages == 0): return None
+ command = ["qpid-jms-receive",
+ #"-b", broker,
+ "--ready-address", "benchmark-ready;{create:always}",
+ "-a", address,
+ "-m", str(messages),
+ "--forever",
+ "--print-content=no",
+ # "--receive-rate", str(opts.receive_rate),
+ "--report-total",
+ "--ack-frequency", str(opts.ack_frequency),
+ # "--ready-address", "%s;{create:always}"%ready_queue,
+ "--report-header=no -v"
+ ]
+ if opts.save_received:
+ command += ["--save-content=%s-receiver-%s.msg"%(queue,index)]
+ command += opts.receive_arg
+ if opts.connection_options:
+ command += ["--connection-options",opts.connection_options]
+ if host: command = ssh_command(host, command)
+ if opts.verbose: print "Receiver: ", command
+ return clients.add(PopenCommand(command, stdout=PIPE, stderr=PIPE))
+
+def start_send(queue, opts, broker, host):
+ address="%s;{%s}"%(queue,",".join(opts.send_option + ["create:always"]))
+ command = ["qpid-jms-send",
+ #"-b", broker,
+ "-a", address,
+ "--messages", str(opts.messages),
+ "--content-size", str(opts.content_size),
+ "--send-rate", str(opts.send_rate),
+ "--report-total",
+ "--report-header=no",
+ "--timestamp=%s"%(opts.timestamp and "yes" or "no"),
+ "--sequence=%s"%(opts.sequence and "yes" or "no"),
+ "--durable", str(opts.durable)
+ ]
+ command += opts.send_arg
+ if opts.connection_options:
+ command += ["--connection-options",opts.connection_options]
+ if host: command = ssh_command(host, command)
+ if opts.verbose: print "Sender: ", command
+ return clients.add(PopenCommand(command, stdout=PIPE, stderr=PIPE))
+
+def error_msg(out, err):
+ return ("\n[stdout]\n%s\n[stderr]\n%s[end]"%(out, err))
+
+def first_line(p):
+ out,err=p.communicate()
+ if p.returncode != 0:
+ raise Exception("Process exit %d: %s"%(p.returncode, error_msg(out,err)))
+
+ print str(out)
+ print str(err)
+ return out.split("\n")[0]
+
+def recreate_queues(queues, brokers, no_delete, opts):
+ c = qpid.messaging.Connection(brokers[0])
+ c.open()
+ s = c.session()
+ for q in queues:
+ if not no_delete:
+ try: s.sender("%s;{delete:always}"%(q)).close()
+ except qpid.messaging.exceptions.NotFound: pass
+ address = "%s;{%s}"%(q, ",".join(opts.create_option + ["create:always"]))
+ if opts.verbose: print "Creating", address
+ s.sender(address)
+ c.close()
+
+def print_header(timestamp):
+ if timestamp: latency_header="\tl-min\tl-max\tl-avg\ttotal-tp"
+ else: latency_header=""
+ print "send-tp\trecv-tp%s"%latency_header
+
+def parse(parser, lines): # Parse sender/receiver output
+ return [map(lambda p: p[0](p[1]), zip(parser,line.split())) for line in lines]
+
+def parse_senders(senders):
+ return parse([int],[first_line(p) for p in senders])
+
+def parse_receivers(receivers):
+ return parse([int,float,float,float],[first_line(p) for p in receivers if p])
+
+def print_data(send_stats, recv_stats, total_tp):
+ for send,recv in map(None, send_stats, recv_stats):
+ line=""
+ if send: line += "%d"%send[0]
+ if recv:
+ line += "\t%d"%recv[0]
+ if len(recv) == 4: line += "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:])
+ if total_tp is not None:
+ line += "\t%d"%total_tp
+ total_tp = None
+ print line
+
+def print_summary(send_stats, recv_stats, total_tp):
+ def avg(s): sum(s) / len(s)
+ send_tp = sum([l[0] for l in send_stats])
+ recv_tp = sum([l[0] for l in recv_stats])
+ summary = "%d\t%d"%(send_tp, recv_tp)
+ if recv_stats and len(recv_stats[0]) == 4:
+ l_min = sum(l[1] for l in recv_stats)/len(recv_stats)
+ l_max = sum(l[2] for l in recv_stats)/len(recv_stats)
+ l_avg = sum(l[3] for l in recv_stats)/len(recv_stats)
+ summary += "\t%.2f\t%.2f\t%.2f"%(l_min, l_max, l_avg)
+ summary += "\t%d"%total_tp
+ print summary
+
+
+class ReadyReceiver:
+ """A receiver for ready messages"""
+ def __init__(self, queue, broker):
+ self.connection = qpid.messaging.Connection(broker)
+ self.connection.open()
+ self.receiver = self.connection.session().receiver(
+ "%s;{create:receiver,delete:receiver,node:{durable:false}}"%(queue))
+ self.receiver.session.sync()
+ self.timeout=10
+
+ def wait(self, receivers):
+ try:
+ for i in receivers: self.receiver.fetch(self.timeout)
+ self.connection.close()
+ except qpid.messaging.Empty:
+ for r in receivers:
+ if (r.poll() is not None):
+ out,err=r.communicate()
+ raise Exception("Receiver error: %s\n%s" %
+ (" ".join(r.command), error_msg(out,err)))
+ raise Exception("Timed out waiting for receivers to be ready")
+
+def flatten(l):
+ return sum(map(lambda s: re.split(re.compile("\s*,\s*|\s+"), s), l), [])
+
+class RoundRobin:
+ def __init__(self,items):
+ self.items = items
+ self.index = 0
+
+ def next(self):
+ if not self.items: return None
+ ret = self.items[self.index]
+ self.index = (self.index+1)%len(self.items)
+ return ret
+
+def main():
+ opts, args = op.parse_args()
+ opts.client_host = flatten(opts.client_host)
+ if not opts.broker:
+ if opts.client_host:
+ raise Exception("--broker must be specified if --client_host is.")
+ opts.broker = ["127.0.0.1"] # Deafult to local broker
+ opts.broker = flatten(opts.broker)
+ brokers = RoundRobin(opts.broker)
+ client_hosts = RoundRobin(opts.client_host)
+ send_out = ""
+ receive_out = ""
+ ready_queue="%s-ready"%(opts.queue_name)
+ queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)]
+ try:
+ for i in xrange(opts.repeat):
+ recreate_queues(queues, opts.broker, opts.no_delete, opts)
+ ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
+
+ def start_receivers():
+ return [ start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next())
+ for q in queues for j in xrange(opts.receivers) ]
+
+
+ def start_senders():
+ return [ start_send(q, opts,brokers.next(), client_hosts.next())
+ for q in queues for j in xrange(opts.senders) ]
+
+ if opts.report_header and i == 0: print_header(opts.timestamp)
+
+ if opts.fill_drain:
+ # First fill the queues, then drain them
+ start = time.time()
+ senders = start_senders()
+ for p in senders: p.wait()
+ receivers = start_receivers()
+ for p in receivers: p.wait()
+ else:
+ # Run senders and receivers in parallel
+ receivers = start_receivers()
+ ready_receiver.wait(filter(None, receivers)) # Wait for receivers ready
+ start = time.time()
+ senders = start_senders()
+ for p in senders + receivers: p.wait()
+
+ total_sent = opts.queues * opts.senders * opts.messages
+ total_tp = total_sent / (time.time()-start)
+ #send_stats=parse_senders(senders)
+ recv_stats=parse_receivers(receivers)
+ #if opts.summarize: print_summary(send_stats, recv_stats, total_tp)
+ #else: print_data(send_stats, recv_stats, total_tp)
+ finally: clients.kill() # No strays
+
+if __name__ == "__main__": main()
+
diff --git a/java/tools/bin/qpid-jms-receive b/java/tools/bin/qpid-jms-receive
new file mode 100755
index 0000000000..57abe874ff
--- /dev/null
+++ b/java/tools/bin/qpid-jms-receive
@@ -0,0 +1,193 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# This starts the controller for coordinating perf tests/
+
+. check-qpid-java-env
+
+PROGRAM_NAME="qpid-jms-receive"
+URL="amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'"
+ADDRESS="queue;{create:always}"
+TIMEOUT="0"
+FOREVER="false"
+MESSAGES="1"
+IGNORE_DUPLICATES="false"
+CHECK_REDELIVERED="false"
+CAPACITY="1000"
+ACK_FREQUENCY="100"
+TX="0"
+ROLLBACL_FREQUENCY="0"
+PRINT_CONTENT="false"
+PRINT_HEADERS="false"
+REPORT_TOTAL="false"
+REPORT_EVERY="0"
+REPORT_HEADER="true"
+READY_ADDRES="''"
+EXTRA_JVM_ARGS=""
+VERBOSE="0"
+
+TEST_ID=`echo ${HOSTNAME} | awk -F . '{print $1}'`
+
+TEMP=$(getopt -n $PROGRAM_NAME -o b:a:f:m:vh\
+ --long broker:,address:,timeout:,forever\
+,messages:,ignore-duplicates,check-redelivered\
+,capacity:,ack-frequency:,tx:,rollback-frequency:\
+,print-content:,print-headers:,report-total\
+,report-every:,report-header:,ready-address:\
+,jvm-args:,verbose,help -- "$@")
+
+# padding the option string with 4 spaces
+# padding the desc string with 30 spaces
+usage()
+{
+ printf "\n%s\n" "Usage: $PROGRAM_NAME [option].."
+
+ printf "\n%20s\n%57s\n" "-b, --broker URL" "url of broker to connect to"
+
+ printf "\n%24s\n%53s\n" "-a,--address ADDRESS" "address to receive from"
+
+ printf "\n%25s\n%71s\n" "--timeout TIMEOUT (0)" "timeout in seconds to wait before exiting"
+
+ printf "\n%17s\n%61s\n" "-f, --forever" "ignore timeout and wait forever"
+
+ printf "\n%24s\n%89s\n" "-m, --messages N (0)" "Number of messages to receive; 0 means receive indefinitely"
+
+ printf "\n%23s\n%84s\n" "--ignore-duplicates" "Detect and ignore duplicates (by checking 'sn' header)"
+
+ printf "\n%23s\n%82s\n%92s\n" "--check-redelivered" "Fails with exception if a duplicate is not marked as" " redelivered (only relevant when ignore-duplicates is selected)"
+
+ printf "\n%23s\n%71s\n" "--capacity N (1000)" "Pre-fetch window (0 implies no pre-fetch)"
+
+ printf "\n%27s\n%94s\n" "--ack-frequency N (100)" "Ack frequency (0 implies none of the messages will get accepted)"
+
+ printf "\n%14s\n%94s\n" "--tx N (0)" "batch size for transactions (0 implies transaction are not used)"
+
+ printf "\n%30s\n%94s\n" "--rollback-frequency N (0)" "rollback frequency (0 implies no transaction will be rolledback)"
+
+ printf "\n%30s\n%55s\n" "--print-content yes|no (0)" "print out message content"
+
+ printf "\n%30s\n%55s\n" "--print-headers yes|no (0)" "print out message headers"
+
+ printf "\n%18s\n%76s\n" "--report-total" "Report total throughput and latency statistics"
+
+ printf "\n%24s\n%87s\n" "--report-every N (0)" "Report throughput and latency statistics every N messages"
+
+ printf "\n%30s\n%47s\n" "--report-header yes|no (1)" "Headers on report"
+
+ printf "\n%27s\n%82s\n" "--ready-address ADDRESS" "send a message to this address when ready to receive"
+
+ printf "\n%14s\n%69s\n" "--jvm-args" "Extra jvm arguments you want to specify"
+
+ printf "\n%17s\n%69s\n\n" "-v, --verbose" "Print debug information for this script"
+}
+
+eval set -- "$TEMP"
+while true; do
+ case $1 in
+ -b|--broker)
+ URL="$2"; shift; shift; continue
+ ;;
+ -a|--address)
+ ADDRESS="$2"; shift; shift; continue
+ ;;
+ --timeout)
+ TIMEOUT="$2"; shift; shift; continue
+ ;;
+ -f|--forever)
+ FOREVER="$2"; shift; shift; continue
+ ;;
+ -m|--messages)
+ MESSAGES="$2"; shift; shift; continue
+ ;;
+ --ignore-duplicates)
+ IGNORE_DUPLICATES="true"; shift; continue
+ ;;
+ --check-redelivered)
+ CHECK_REDELIVERED="true"; shift; continue
+ ;;
+ --capacity)
+ CAPACITY="$2"; shift; shift; continue
+ ;;
+ --ack-frequency)
+ ACK_FREQUENCY="$2"; shift; shift; continue
+ ;;
+ --tx)
+ TX="$2"; shift; shift; continue
+ ;;
+ --rollback-frequency)
+ ROLLBACK_FREQUENCY="$2"; shift; shift; continue
+ ;;
+ --print-content)
+ if [ "$2" == "yes" ]; then PRINT_CONTENT="true"; else PRINT_CONTENT="false"; fi; shift; shift; continue
+ ;;
+ --print-headers)
+ if [ "$2" == "yes" ]; then PRINT_HEADERS="true"; else PRINT_HEADERS="false"; fi; shift; shift; continue
+ ;;
+ --report-total)
+ REPORT_TOTAL="true"; shift; continue
+ ;;
+ --report-every)
+ REPORT_EVERY="$2"; shift; shift; continue
+ ;;
+ --report-header)
+ if [ "$2" == "yes" ]; then REPORT_HEADER="true"; else REPORT_HEADER="false"; fi; shift; shift; continue
+ ;;
+ --ready-address)
+ READY_ADDRESS="$2"; shift; shift; continue
+ ;;
+ -a|--jvm-args)
+ EXTRA_JVM_ARGS="$2"; shift; shift; continue
+ ;;
+ -h|--help)
+ usage
+ exit 0
+ ;;
+ -v|--verbose)
+ VERBOSE="1"; shift; continue
+ ;;
+ --)
+ # no more arguments to parse
+ break
+ ;;
+ *)
+ # no more arguments to parse
+ break
+ ;;
+ esac
+done
+
+RECEIVER_ARGS="-server -Durl=$URL \
+-Daddress=$ADDRESS \
+-Dtimeout=$TIMEOUT \
+-Dmsg-count=$MESSAGES \
+-Dack-frequency=$ACK_FREQUENCY \
+-Dtx=$TX \
+-Drollback-frequnecy=$ROLLBACL_FREQUENCY \
+-Dprint-content=$PRINT_CONTENT \
+-Dprint-headers=$PRINT_HEADERS \
+-Dreport-total=$REPORT_TOTAL \
+-Dreport-every=$REPORT_EVERY \
+-Dreport-header=$REPORT_HEADER \
+-Dmax_prefetch=$CAPACITY "
+
+if [ "x$READY_ADDRESS" != "x" ]; then RECEIVER_ARGS="$RECEIVER_ARGS -Dready-address=$READY_ADDRESS"; fi
+if [ "$VERBOSE" == "1" ]; then echo $RECEIVER_ARGS; fi
+echo $RECEIVER_ARGS
+$JAVA -cp $CLASSPATH $LOG_CONFIG $JAVA_MEM $RECEIVER_ARGS org.apache.qpid.tools.QpidReceive
diff --git a/java/tools/bin/qpid-jms-send b/java/tools/bin/qpid-jms-send
new file mode 100755
index 0000000000..d7695924f0
--- /dev/null
+++ b/java/tools/bin/qpid-jms-send
@@ -0,0 +1,261 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# This starts the controller for coordinating perf tests/
+
+. check-qpid-java-env
+
+PROGRAM_NAME="qpid-jms-send"
+URL="amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'"
+ADDRESS="queue;{create:always}"
+MESSAGES="1"
+ID=""
+REPLY_TO=""
+SEND_EOS="1"
+DURABLE="false"
+TTL="0"
+PRIORITY="0"
+PROPERTY=""
+CORRELATION_ID=""
+USER_ID=""
+CONTENT_STRING=""
+CONTENT_SIZE="1024"
+CONTENT_MAP=""
+CAPACITY="1000"
+ACK_FREQUENCY="100"
+TX="0"
+ROLLBACL_FREQUENCY="0"
+PRINT_CONTENT="true"
+PRINT_HEADERS="false"
+REPORT_TOTAL="false"
+REPORT_EVERY="0"
+REPORT_HEADER="true"
+SEND_RATE="-1"
+SEQUNCE="1"
+DISABLE_TIMESTAMP="false"
+EXTRA_JVM_ARGS=""
+VERBOSE="0"
+
+TEST_ID=`echo ${HOSTNAME} | awk -F . '{print $1}'`
+
+TEMP=$(getopt -n $PROGRAM_NAME -o b:a:m:i:P:M:vh\
+ --long broker:,address:,messages:,id:,reply-to:\
+,send-eos:,durable:,ttl:,property:,correlational-id:\
+,user-id:,content-string:,content-size:,content-map:\
+,capacity:,ack-frequency:,tx:,rollback-frequency:\
+,print-content:,print-headers:,report-total\
+,report-every:,report-header:,send-rate:,sequence:,timestamp:\
+,jvm-args:,verbose,help -- "$@")
+
+# padding the option string with 4 spaces
+# padding the desc string with 30 spaces
+usage()
+{
+ printf "\n%s\n" "Usage: $PROGRAM_NAME [option].."
+
+ printf "\n%20s\n%57s\n" "-b, --broker URL" "url of broker to connect to"
+
+ printf "\n%24s\n%53s\n" "-a,--address ADDRESS" "address to receive from"
+
+ printf "\n%24s\n%89s\n" "-m, --messages N (0)" "Number of messages to receive; 0 means receive indefinitely"
+
+ printf "\n%15s\n%75s\n" "-i, --id ID" "use the supplied id instead of generating one"
+
+ printf "\n%23s\n%54s\n" "--reply-to REPLY-TO" "specify reply-to address"
+
+ printf "\n%20s\n%70s\n" "--send-eos N (0)" "send N EOS messages to mark end of input"
+
+ printf "\n%24s\n%54s\n" "--durable yes|no (0)" "mark messages as durable"
+
+ printf "\n%19s\n%72s\n" "--ttl msecs (0)" "time-to-live for messages, in milliseconds"
+
+ printf "\n%27s\n%72s\n" "--priority PRIORITY (0)" "time-to-live for messages, in milliseconds"
+
+ printf "\n%29s\n%54s\n" "-P, --property NAME=VALUE" "specify message property"
+
+ printf "\n%23s\n%57s\n" "--correlation-id ID" "correlation-id for message"
+
+ printf "\n%20s\n%48s\n" "--user-id USERID" "userid for message"
+
+ printf "\n%28s\n%60s\n" "--content-string CONTENT" "use CONTENT as message content"
+
+ printf "\n%24s\n%62s\n" "--content-size N (0)" "create an N-byte message content"
+
+ printf "\n%32s\n%59s\n" "-M, --content-map NAME=VALUE" "specify entry for map content"
+
+ printf "\n%23s\n%71s\n" "--capacity N (1000)" "Pre-fetch window (0 implies no pre-fetch)"
+
+ printf "\n%27s\n%94s\n" "--ack-frequency N (100)" "Ack frequency (0 implies none of the messages will get accepted)"
+
+ printf "\n%14s\n%94s\n" "--tx N (0)" "batch size for transactions (0 implies transaction are not used)"
+
+ printf "\n%30s\n%94s\n" "--rollback-frequency N (0)" "rollback frequency (0 implies no transaction will be rolledback)"
+
+ printf "\n%30s\n%55s\n" "--print-content yes|no (1)" "print out message content"
+
+ printf "\n%30s\n%55s\n" "--print-headers yes|no (0)" "print out message headers"
+
+ printf "\n%18s\n%76s\n" "--report-total" "Report total throughput and latency statistics"
+
+ printf "\n%24s\n%87s\n" "--report-every N (0)" "Report throughput and latency statistics every N messages"
+
+ printf "\n%30s\n%47s\n" "--report-header yes|no (1)" "Headers on report"
+
+ printf "\n%21s\n%64s\n%62s\n" "--send-rate N (0)" "Send at rate of N messages/second." "0 means send as fast as possible"
+
+ printf "\n%25s\n%69s\n%77s\n" "--sequence yes|no (1)" "Add a sequence number messages property" "(required for duplicate/lost message detection)"
+
+ printf "\n%26s\n%64s\n%77s\n" "--timestamp yes|no (1)" "Add a time stamp messages property" "(required for duplicate/lost message detection)"
+
+ printf "\n%14s\n%69s\n" "--jvm-args" "Extra jvm arguments you want to specify"
+
+ printf "\n%17s\n%69s\n\n" "-v, --verbose" "Print debug information for this script"
+}
+
+eval set -- "$TEMP"
+while true; do
+ case $1 in
+ -b|--broker)
+ URL="$2"; shift; shift; continue
+ ;;
+ -a|--address)
+ ADDRESS="$2"; shift; shift; continue
+ ;;
+ -m|--messages)
+ MESSAGES="$2"; shift; shift; continue
+ ;;
+ -i|--id)
+ ID="$2"; shift; shift; continue
+ ;;
+ --reply-to)
+ REPLY_TO="$2"; shift; shift; continue
+ ;;
+ --send-eos)
+ SEND_EOS="$2"; shift; shift; continue
+ ;;
+ --durable)
+ if [ "$2" == "1" ]; then DURABLE="true"; else DURABLE="false"; fi; shift; shift; continue
+ ;;
+ --ttl)
+ TTL="$2"; shift; shift; continue
+ ;;
+ --priority)
+ PRIORITY="$2"; shift; shift; continue
+ ;;
+ -P|--property)
+ PROPERTY="$2,$PROPERTY"; shift; shift; continue
+ ;;
+ --correlation-id)
+ CORRELATION_ID="$2"; shift; shift; continue
+ ;;
+ --user-id)
+ USER_ID="$2"; shift; shift; continue
+ ;;
+ --content-string)
+ CONTENT_STRING="$2"; shift; shift; continue
+ ;;
+ --content-size)
+ CONTENT_SIZE="$2"; shift; shift; continue
+ ;;
+ -M|--content-map)
+ CONTENT_MAP="$2,$CONTENT_MAP"; shift; shift; continue
+ ;;
+ --capacity)
+ CAPACITY="$2"; shift; shift; continue
+ ;;
+ --ack-frequency)
+ ACK_FREQUENCY="$2"; shift; shift; continue
+ ;;
+ --tx)
+ TX="$2"; shift; shift; continue
+ ;;
+ --rollback-frequency)
+ ROLLBACK_FREQUENCY="$2"; shift; shift; continue
+ ;;
+ --print-content)
+ if [ "$2" == "yes" ]; then PRINT_CONTENT="true"; else PRINT_CONTENT="false"; fi; shift; shift; continue
+ ;;
+ --print-headers)
+ if [ "$2" == "yes" ]; then PRINT_HEADERS="true"; else PRINT_HEADERS="false"; fi; shift; shift; continue
+ ;;
+ --report-total)
+ REPORT_TOTAL="true"; shift; continue
+ ;;
+ --report-every)
+ REPORT_EVERY="$2"; shift; shift; continue
+ ;;
+ --report-header)
+ if [ "$2" == "yes" ]; then REPORT_HEADER="true"; else REPORT_HEADER="false"; fi; shift; shift; continue
+ ;;
+ --send-rate)
+ SEND_RATE="$2"; shift; shift; continue
+ ;;
+ --sequence)
+ if [ "$2" == "yes" ]; then SEQUENCE="true"; else SEQUENCE="false"; fi; shift; shift; continue
+ ;;
+ --timestamp)
+ if [ "$2" == "yes" ]; then DISABLE_TIMESTAMP="false"; else DISABLE_TIMESTAMP="true"; fi; shift; shift; continue
+ ;;
+ -a|--jvm-args)
+ EXTRA_JVM_ARGS="$2"; shift; shift; continue
+ ;;
+ -h|--help)
+ usage
+ exit 0
+ ;;
+ -v|--verbose)
+ VERBOSE="1"; shift; continue
+ ;;
+ --)
+ # no more arguments to parse
+ break
+ ;;
+ *)
+ # no more arguments to parse
+ break
+ ;;
+ esac
+done
+
+SENDER_ARGS="-server -Durl=$URL \
+-Daddress=$ADDRESS \
+-Dmsg-count=$MESSAGES \
+-Dsend-eos=$SEND_EOS \
+-Ddurable=$DURABLE \
+-Dmsg_size=$CONTENT_SIZE \
+-Dsend-rate=$SEND_RATE \
+-Ddisable-timestamp=$DISABLE_TIMESTAMP \
+-Dttl=$TTL \
+-Dpriority=$PRIORITY \
+-Dtx=$TX \
+-Drollback-frequnecy=$ROLLBACK_FREQUENCY \
+-Dprint-content=$PRINT_CONTENT \
+-Dprint-headers=$PRINT_HEADERS \
+-Dreport-total=$REPORT_TOTAL \
+-Dreport-every=$REPORT_EVERY \
+-Dreport-header=$REPORT_HEADER \
+-Dmax_prefetch=$CAPACITY "
+
+if [ "x$ID" != "x" ]; then SENDER_ARGS="$SENDER_ARGS -Did=$ID"; fi
+if [ "x$USER_ID" != "x" ]; then SENDER_ARGS="$SENDER_ARGS -Duser_id=$USER_ID"; fi
+if [ "x$CORRELATION_ID" != "x" ]; then SENDER_ARGS="$SENDER_ARGS -Dcorrelation_id=$CORRELATION_ID"; fi
+
+if [ "$VERBOSE" == "1" ]; then echo $SENDER_ARGS; fi
+$JAVA -cp $CLASSPATH $LOG_CONFIG $JAVA_MEM $SENDER_ARGS org.apache.qpid.tools.QpidSend
diff --git a/java/tools/build.xml b/java/tools/build.xml
index 7cd1b1172c..99b0375e95 100644
--- a/java/tools/build.xml
+++ b/java/tools/build.xml
@@ -7,9 +7,9 @@
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
- -
+ -
- http://www.apache.org/licenses/LICENSE-2.0
- -
+ -
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,7 +19,6 @@
-
-->
<project name="Qpid Tools" default="build">
-
<property name="module.depends" value="client common"/>
<import file="../module.xml"/>
diff --git a/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java b/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java
index dbc73c404f..de7748acd6 100644
--- a/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java
+++ b/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java
@@ -1,4 +1,3 @@
-package org.apache.qpid.testkit;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,6 +18,7 @@ package org.apache.qpid.testkit;
* under the License.
*
*/
+package org.apache.qpid.testkit;
public interface ErrorHandler {
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/Clock.java b/java/tools/src/main/java/org/apache/qpid/tools/Clock.java
index 4e79dd62a8..7eb83a520b 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/Clock.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/Clock.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.tools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* In the future this will be replaced by a Clock abstraction
* that can utilize a realtime clock when running in RT Java.
@@ -27,6 +30,8 @@ package org.apache.qpid.tools;
public class Clock
{
+ private static final Logger _logger = LoggerFactory.getLogger(Clock.class);
+
public final static long SEC = 60000;
private static Precision precision;
@@ -54,7 +59,11 @@ public class Clock
precision = Precision.getPrecision(System.getProperty("precision","mili"));
//offset = Long.getLong("offset",-1);
- System.out.println("Using precision : " + precision + " and offset " + offset);
+ if (_logger.isDebugEnabled())
+ {
+ System.out.println("Using precision : " + precision );
+ //+ " and offset " + offset);
+ }
}
public static Precision getPrecision()
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java b/java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java
index c6abdf6c84..e0e48519f3 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java
@@ -29,383 +29,422 @@ import org.apache.qpid.client.AMQConnection;
public class JVMArgConfiguration implements TestConfiguration
{
- /*
- * By default the connection URL is used.
- * This allows a user to easily specify a fully fledged URL any given property.
- * Ex. SSL parameters
- *
- * By providing a host & port allows a user to simply override the URL.
- * This allows to create multiple clients in test scripts easily,
- * without having to deal with the long URL format.
- */
- private String url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'";
+ /*
+ * By default the connection URL is used.
+ * This allows a user to easily specify a fully fledged URL any given property.
+ * Ex. SSL parameters
+ *
+ * By providing a host & port allows a user to simply override the URL.
+ * This allows to create multiple clients in test scripts easily,
+ * without having to deal with the long URL format.
+ */
+ private String url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'";
- private String host = "";
+ private String host = "";
- private int port = -1;
+ private int port = -1;
- private String address = "queue; {create : always}";
+ private String address = "queue; {create : always}";
- private int msg_size = 1024;
+ private long timeout = 0;
- private int random_msg_size_start_from = 1;
+ private int msg_size = 1024;
- private boolean cacheMessage = false;
+ private int random_msg_size_start_from = 1;
- private boolean disableMessageID = false;
+ private boolean cacheMessage = false;
- private boolean disableTimestamp = false;
+ private boolean disableMessageID = false;
- private boolean durable = false;
+ private boolean disableTimestamp = false;
- private int transaction_size = 0;
+ private boolean durable = false;
- private int ack_mode = Session.AUTO_ACKNOWLEDGE;
+ private int transaction_size = 0;
- private int msg_count = 10;
+ private int ack_mode = Session.AUTO_ACKNOWLEDGE;
- private int warmup_count = 1;
+ private int msg_count = 10;
- private boolean random_msg_size = false;
+ private int warmup_count = 1;
- private String msgType = "bytes";
+ private boolean random_msg_size = false;
- private boolean printStdDev = false;
+ private String msgType = "bytes";
- private int sendRate = 0;
+ private boolean printStdDev = false;
- private boolean externalController = false;
+ private int sendRate = 0;
- private boolean useUniqueDest = false; // useful when using multiple connections.
+ private boolean externalController = false;
- private int ackFrequency = 100;
+ private boolean useUniqueDest = false; // useful when using multiple connections.
- private DecimalFormat df = new DecimalFormat("###.##");
+ private int ackFrequency = 100;
- private int reportEvery = 0;
+ private DecimalFormat df = new DecimalFormat("###.##");
- private boolean isReportTotal = false;
-
- private boolean isReportHeader = true;
+ private int reportEvery = 0;
- private boolean isReportLatency = false;
-
- private int sendEOS = 0;
-
- private int connectionCount = 1;
-
- private int rollbackFrequency = 0;
-
- private boolean printHeaders;
-
- public JVMArgConfiguration()
- {
-
- url = System.getProperty("url",url);
- host = System.getProperty("host","");
- port = Integer.getInteger("port", -1);
- address = System.getProperty("address",address);
-
- msg_size = Integer.getInteger("msg-size", 1024);
- cacheMessage = Boolean.getBoolean("cache-msg");
- disableMessageID = Boolean.getBoolean("disable-message-id");
- disableTimestamp = Boolean.getBoolean("disable-timestamp");
- durable = Boolean.getBoolean("durable");
- transaction_size = Integer.getInteger("tx",1000);
- ack_mode = Integer.getInteger("ack-mode",Session.AUTO_ACKNOWLEDGE);
- msg_count = Integer.getInteger("msg-count",msg_count);
- warmup_count = Integer.getInteger("warmup-count",warmup_count);
- random_msg_size = Boolean.getBoolean("random-msg-size");
- msgType = System.getProperty("msg-type","bytes");
- printStdDev = Boolean.getBoolean("print-std-dev");
- sendRate = Integer.getInteger("rate",0);
- externalController = Boolean.getBoolean("ext-controller");
- useUniqueDest = Boolean.getBoolean("use-unique-dest");
- random_msg_size_start_from = Integer.getInteger("random-msg-size-start-from", 1);
- reportEvery = Integer.getInteger("report-every");
- isReportTotal = Boolean.getBoolean("report-total");
- isReportHeader = (System.getProperty("report-header") == null) ? true : Boolean.getBoolean("report-header");
- isReportLatency = Boolean.getBoolean("report-latency");
- sendEOS = Integer.getInteger("send-eos");
- connectionCount = Integer.getInteger("con_count",1);
- ackFrequency = Integer.getInteger("ack-frequency");
- rollbackFrequency = Integer.getInteger("rollback-frequency");
- printHeaders = Boolean.getBoolean("print-headers");
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getUrl()
- */
- @Override
- public String getUrl()
- {
- return url;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getHost()
- */
- @Override
- public String getHost()
- {
- return host;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getPort()
- */
- @Override
- public int getPort()
- {
- return port;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getAddress()
- */
- @Override
- public String getAddress()
- {
- return address;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getAckMode()
- */
- @Override
- public int getAckMode()
- {
- return ack_mode;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getMsgCount()
- */
- @Override
- public int getMsgCount()
- {
- return msg_count;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getMsgSize()
- */
- @Override
- public int getMsgSize()
- {
- return msg_size;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getRandomMsgSizeStartFrom()
- */
- @Override
- public int getRandomMsgSizeStartFrom()
- {
- return random_msg_size_start_from;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#isDurable()
- */
- @Override
- public boolean isDurable()
- {
- return durable;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#isTransacted()
- */
- @Override
- public boolean isTransacted()
- {
- return transaction_size > 0;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getTransactionSize()
- */
- @Override
- public int getTransactionSize()
- {
- return transaction_size;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getWarmupCount()
- */
- @Override
- public int getWarmupCount()
- {
- return warmup_count;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#isCacheMessage()
- */
- @Override
- public boolean isCacheMessage()
- {
- return cacheMessage;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#isDisableMessageID()
- */
- @Override
- public boolean isDisableMessageID()
- {
- return disableMessageID;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#isDisableTimestamp()
- */
- @Override
- public boolean isDisableTimestamp()
- {
- return disableTimestamp;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#isRandomMsgSize()
- */
- @Override
- public boolean isRandomMsgSize()
- {
- return random_msg_size;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getMessageType()
- */
- @Override
- public String getMessageType()
- {
- return msgType;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#isPrintStdDev()
- */
- @Override
- public boolean isPrintStdDev()
- {
- return printStdDev;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getSendRate()
- */
- @Override
- public int getSendRate()
- {
- return sendRate;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#isExternalController()
- */
- @Override
- public boolean isExternalController()
- {
- return externalController;
- }
-
- public void setAddress(String addr)
- {
- address = addr;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#isUseUniqueDests()
- */
- @Override
- public boolean isUseUniqueDests()
- {
- return useUniqueDest;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getAckFrequency()
- */
- @Override
- public int getAckFrequency()
- {
- return ackFrequency;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#createConnection()
- */
- @Override
- public Connection createConnection() throws Exception
- {
- if (getHost().equals("") || getPort() == -1)
- {
- return new AMQConnection(getUrl());
- }
- else
- {
- return new AMQConnection(getHost(),getPort(),"guest","guest","test","test");
- }
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getDecimalFormat()
- */
- @Override
- public DecimalFormat getDecimalFormat()
- {
- return df;
- }
-
- @Override
- public int reportEvery()
- {
- return reportEvery;
- }
-
- @Override
- public boolean isReportTotal()
- {
- return isReportTotal;
- }
-
- @Override
- public boolean isReportHeader()
- {
- return isReportHeader;
- }
-
- @Override
- public boolean isReportLatency()
- {
- return isReportLatency;
- }
-
- @Override
- public int getSendEOS()
- {
- return sendEOS;
- }
-
- @Override
- public int getConnectionCount()
- {
- return connectionCount;
- }
-
- @Override
- public int getRollbackFrequency()
- {
- return rollbackFrequency;
- }
-
- @Override
- public boolean isPrintHeaders()
- {
- return printHeaders;
- }
+ private boolean isReportTotal = false;
+
+ private boolean isReportHeader = true;
+
+ private int sendEOS = 0;
+
+ private int connectionCount = 1;
+
+ private int rollbackFrequency = 0;
+
+ private boolean printHeaders;
+
+ private boolean printContent;
+
+ private long ttl;
+
+ private int priority;
+
+ private String readyAddress;
+
+ public JVMArgConfiguration()
+ {
+
+ url = System.getProperty("url",url);
+ host = System.getProperty("host","");
+ port = Integer.getInteger("port", -1);
+ address = System.getProperty("address",address);
+
+ timeout = Long.getLong("timeout",0);
+ msg_size = Integer.getInteger("msg-size", 0);
+ cacheMessage = true; //Boolean.getBoolean("cache-msg");
+ disableMessageID = Boolean.getBoolean("disable-message-id");
+ disableTimestamp = Boolean.getBoolean("disable-timestamp");
+ durable = Boolean.getBoolean("durable");
+ transaction_size = Integer.getInteger("tx",1000);
+ ack_mode = Integer.getInteger("ack-mode",Session.AUTO_ACKNOWLEDGE);
+ msg_count = Integer.getInteger("msg-count",msg_count);
+ warmup_count = Integer.getInteger("warmup-count",warmup_count);
+ random_msg_size = Boolean.getBoolean("random-msg-size");
+ msgType = System.getProperty("msg-type","bytes");
+ printStdDev = Boolean.getBoolean("print-std-dev");
+ sendRate = Integer.getInteger("rate",0);
+ externalController = Boolean.getBoolean("ext-controller");
+ useUniqueDest = Boolean.getBoolean("use-unique-dest");
+ random_msg_size_start_from = Integer.getInteger("random-msg-size-start-from", 1);
+ reportEvery = Integer.getInteger("report-every",0);
+ isReportTotal = Boolean.getBoolean("report-total");
+ isReportHeader = (System.getProperty("report-header") == null) ? true : Boolean.getBoolean("report-header");
+ sendEOS = Integer.getInteger("send-eos",1);
+ connectionCount = Integer.getInteger("con_count",1);
+ ackFrequency = Integer.getInteger("ack-frequency",100);
+ rollbackFrequency = Integer.getInteger("rollback-frequency",0);
+ printHeaders = Boolean.getBoolean("print-headers");
+ printContent = Boolean.getBoolean("print-content");
+ ttl = Long.getLong("ttl", 0);
+ priority = Integer.getInteger("priority", 0);
+ readyAddress = System.getProperty("ready-address");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#getUrl()
+ */
+ @Override
+ public String getUrl()
+ {
+ return url;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#getHost()
+ */
+ @Override
+ public String getHost()
+ {
+ return host;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#getPort()
+ */
+ @Override
+ public int getPort()
+ {
+ return port;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#getAddress()
+ */
+ @Override
+ public String getAddress()
+ {
+ return address;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#getTimeout()
+ */
+ @Override
+ public long getTimeout()
+ {
+ return timeout;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#getAckMode()
+ */
+ @Override
+ public int getAckMode()
+ {
+ return ack_mode;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#getMsgCount()
+ */
+ @Override
+ public int getMsgCount()
+ {
+ return msg_count;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#getMsgSize()
+ */
+ @Override
+ public int getMsgSize()
+ {
+ return msg_size;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#getRandomMsgSizeStartFrom()
+ */
+ @Override
+ public int getRandomMsgSizeStartFrom()
+ {
+ return random_msg_size_start_from;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#isDurable()
+ */
+ @Override
+ public boolean isDurable()
+ {
+ return durable;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#isTransacted()
+ */
+ @Override
+ public boolean isTransacted()
+ {
+ return transaction_size > 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#getTransactionSize()
+ */
+ @Override
+ public int getTransactionSize()
+ {
+ return transaction_size;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#getWarmupCount()
+ */
+ @Override
+ public int getWarmupCount()
+ {
+ return warmup_count;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#isCacheMessage()
+ */
+ @Override
+ public boolean isCacheMessage()
+ {
+ return cacheMessage;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#isDisableMessageID()
+ */
+ @Override
+ public boolean isDisableMessageID()
+ {
+ return disableMessageID;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#isDisableTimestamp()
+ */
+ @Override
+ public boolean isDisableTimestamp()
+ {
+ return disableTimestamp;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#isRandomMsgSize()
+ */
+ @Override
+ public boolean isRandomMsgSize()
+ {
+ return random_msg_size;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#getMessageType()
+ */
+ @Override
+ public String getMessageType()
+ {
+ return msgType;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#isPrintStdDev()
+ */
+ @Override
+ public boolean isPrintStdDev()
+ {
+ return printStdDev;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#getSendRate()
+ */
+ @Override
+ public int getSendRate()
+ {
+ return sendRate;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#isExternalController()
+ */
+ @Override
+ public boolean isExternalController()
+ {
+ return externalController;
+ }
+
+ public void setAddress(String addr)
+ {
+ address = addr;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#isUseUniqueDests()
+ */
+ @Override
+ public boolean isUseUniqueDests()
+ {
+ return useUniqueDest;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#getAckFrequency()
+ */
+ @Override
+ public int getAckFrequency()
+ {
+ return ackFrequency;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#createConnection()
+ */
+ @Override
+ public Connection createConnection() throws Exception
+ {
+ if (getHost().equals("") || getPort() == -1)
+ {
+ return new AMQConnection(getUrl());
+ }
+ else
+ {
+ return new AMQConnection(getHost(),getPort(),"guest","guest","test","test");
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#getDecimalFormat()
+ */
+ @Override
+ public DecimalFormat getDecimalFormat()
+ {
+ return df;
+ }
+
+ @Override
+ public int reportEvery()
+ {
+ return reportEvery;
+ }
+
+ @Override
+ public boolean isReportTotal()
+ {
+ return isReportTotal;
+ }
+
+ @Override
+ public boolean isReportHeader()
+ {
+ return isReportHeader;
+ }
+
+ @Override
+ public int getSendEOS()
+ {
+ return sendEOS;
+ }
+
+ @Override
+ public int getConnectionCount()
+ {
+ return connectionCount;
+ }
+
+ @Override
+ public int getRollbackFrequency()
+ {
+ return rollbackFrequency;
+ }
+
+ @Override
+ public boolean isPrintHeaders()
+ {
+ return printHeaders;
+ }
+
+ @Override
+ public boolean isPrintContent()
+ {
+ return printContent;
+ }
+
+ @Override
+ public long getTTL()
+ {
+ return ttl;
+ }
+
+ @Override
+ public int getPriority()
+ {
+ return priority;
+ }
+
+ @Override
+ public String getReadyAddress()
+ {
+ return readyAddress;
+ }
}
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java b/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java
index 8ab1379fce..a0ba928e1f 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java
@@ -1,4 +1,3 @@
-package org.apache.qpid.tools;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,6 +18,7 @@ package org.apache.qpid.tools;
* under the License.
*
*/
+package org.apache.qpid.tools;
import javax.jms.BytesMessage;
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java b/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java
index 02f011f1b9..6dd8b7e1ca 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java
@@ -28,10 +28,12 @@ import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.tools.TestConfiguration.MessageType;
import org.apache.qpid.tools.report.BasicReporter;
import org.apache.qpid.tools.report.Reporter;
@@ -42,140 +44,164 @@ import org.slf4j.LoggerFactory;
public class QpidReceive implements MessageListener
{
- private static final Logger _logger = LoggerFactory.getLogger(QpidSend.class);
- private final CountDownLatch testCompleted = new CountDownLatch(1);
-
- private Connection con;
- private Session session;
- private Destination dest;
- private MessageConsumer consumer;
- private boolean transacted = false;
- private boolean isRollback = false;
- private int txSize = 0;
- private int rollbackFrequency = 0;
- private int ackFrequency = 0;
- private int expected = 0;
- private int received = 0;
- private Reporter report;
- private TestConfiguration config;
-
- public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest)
- {
- this(report,config, con, dest, UUID.randomUUID().toString());
- }
-
- public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest, String prefix)
- {
- //System.out.println("Producer ID : " + id);
- this.report = report;
- this.config = config;
- this.con = con;
- this.dest = dest;
- }
-
- public void setUp() throws Exception
- {
- if (config.isTransacted())
- {
- session = con.createSession(true, Session.SESSION_TRANSACTED);
- }
- else if (config.getAckFrequency() > 0)
- {
- session = con.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
- }
- else
- {
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
- consumer = session.createConsumer(dest);
- consumer.setMessageListener(this);
- System.out.println("Consumer: " + /*id +*/ " Receiving messages from : " + ((AMQDestination)dest).getAddressName() + "\n");
-
- transacted = config.isTransacted();
- txSize = config.getTransactionSize();
- isRollback = config.getRollbackFrequency() > 0;
- rollbackFrequency = config.getRollbackFrequency();
- ackFrequency = config.getAckFrequency();
- }
-
- public void resetCounters()
- {
- received = 0;
- expected = 0;
- report.clear();
- }
-
- public void onMessage(Message msg)
- {
- try
- {
- if (msg instanceof TextMessage &&
- TestConfiguration.EOS.equals(((TextMessage)msg).getText()))
- {
- testCompleted.countDown();
- return;
- }
-
- received++;
- report.message(msg);
-
- if (transacted && (received % txSize == 0))
- {
- if (isRollback && (received % rollbackFrequency == 0))
- {
- session.rollback();
- }
- else
- {
- session.commit();
- }
- }
- else if (ackFrequency > 0)
- {
- msg.acknowledge();
- }
-
- if (expected >= received)
- {
- testCompleted.countDown();
- }
-
- }
- catch(Exception e)
- {
- _logger.error("Error when receiving messages",e);
- }
-
- }
-
- public void waitforCompletion(int expected) throws Exception
- {
- this.expected = expected;
- testCompleted.await();
- }
-
- public void tearDown() throws Exception
- {
- session.close();
- }
-
- public static void main(String[] args) throws Exception
- {
- TestConfiguration config = new JVMArgConfiguration();
- Reporter reporter = new BasicReporter(config.isReportLatency()? ThroughputAndLatency.class : Throughput.class,
- System.out,
- config.reportEvery(),
- config.isReportHeader()
- );
- Destination dest = AMQDestination.createDestination(config.getAddress());
- QpidReceive receiver = new QpidReceive(reporter,config, config.createConnection(),dest);
- receiver.setUp();
- receiver.waitforCompletion(config.getMsgCount());
- if (config.isReportTotal())
- {
- reporter.report();
- }
- receiver.tearDown();
- }
+ private static final Logger _logger = LoggerFactory.getLogger(QpidReceive.class);
+ private final CountDownLatch testCompleted = new CountDownLatch(1);
+
+ private Connection con;
+ private Session session;
+ private Destination dest;
+ private MessageConsumer consumer;
+ private boolean transacted = false;
+ private boolean isRollback = false;
+ private int txSize = 0;
+ private int rollbackFrequency = 0;
+ private int ackFrequency = 0;
+ private int expected = 0;
+ private int received = 0;
+ private Reporter report;
+ private TestConfiguration config;
+
+ public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest)
+ {
+ this(report,config, con, dest, UUID.randomUUID().toString());
+ }
+
+ public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest, String prefix)
+ {
+ //System.out.println("Producer ID : " + id);
+ this.report = report;
+ this.config = config;
+ this.con = con;
+ this.dest = dest;
+ }
+
+ public void setUp() throws Exception
+ {
+ con.start();
+ if (config.isTransacted())
+ {
+ session = con.createSession(true, Session.SESSION_TRANSACTED);
+ }
+ else if (config.getAckFrequency() > 0)
+ {
+ session = con.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+ }
+ else
+ {
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+ consumer = session.createConsumer(dest);
+ consumer.setMessageListener(this);
+ if (_logger.isDebugEnabled())
+ {
+ System.out.println("Consumer: " + /*id +*/ " Receiving messages from : " + ((AMQDestination)dest).getAddressName() + "\n");
+ }
+
+ transacted = config.isTransacted();
+ txSize = config.getTransactionSize();
+ isRollback = config.getRollbackFrequency() > 0;
+ rollbackFrequency = config.getRollbackFrequency();
+ ackFrequency = config.getAckFrequency();
+
+ _logger.debug("Ready address : " + config.getReadyAddress());
+ if (config.getReadyAddress() != null)
+ {
+ MessageProducer prod = session.createProducer(AMQDestination
+ .createDestination(config.getReadyAddress()));
+ prod.send(session.createMessage());
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Sending message to ready address " + prod.getDestination());
+ }
+ }
+ }
+
+ public void resetCounters()
+ {
+ received = 0;
+ expected = 0;
+ report.clear();
+ }
+
+ public void onMessage(Message msg)
+ {
+ try
+ {
+ if (msg instanceof TextMessage &&
+ TestConfiguration.EOS.equals(((TextMessage)msg).getText()))
+ {
+ testCompleted.countDown();
+ return;
+ }
+
+ received++;
+ report.message(msg);
+
+ if (config.isPrintHeaders())
+ {
+ System.out.println(((AbstractJMSMessage)msg).toHeaderString());
+ }
+
+ if (config.isPrintContent())
+ {
+ System.out.println(((AbstractJMSMessage)msg).toBodyString());
+ }
+
+ if (transacted && (received % txSize == 0))
+ {
+ if (isRollback && (received % rollbackFrequency == 0))
+ {
+ session.rollback();
+ }
+ else
+ {
+ session.commit();
+ }
+ }
+ else if (ackFrequency > 0)
+ {
+ msg.acknowledge();
+ }
+
+ if (received >= expected)
+ {
+ testCompleted.countDown();
+ }
+
+ }
+ catch(Exception e)
+ {
+ _logger.error("Error when receiving messages",e);
+ }
+ }
+
+ public void waitforCompletion(int expected) throws Exception
+ {
+ this.expected = expected;
+ testCompleted.await();
+ }
+
+ public void tearDown() throws Exception
+ {
+ session.close();
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ TestConfiguration config = new JVMArgConfiguration();
+ Reporter reporter = new BasicReporter(ThroughputAndLatency.class,
+ System.out,
+ config.reportEvery(),
+ config.isReportHeader());
+ Destination dest = AMQDestination.createDestination(config.getAddress());
+ QpidReceive receiver = new QpidReceive(reporter,config, config.createConnection(),dest);
+ receiver.setUp();
+ receiver.waitforCompletion(config.getMsgCount() + config.getSendEOS());
+ if (config.isReportTotal())
+ {
+ reporter.report();
+ }
+ receiver.tearDown();
+ }
}
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java b/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java
index c058b83d41..3d321dcade 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java
@@ -43,249 +43,261 @@ import org.slf4j.LoggerFactory;
public class QpidSend
{
- private Connection con;
- private Session session;
- private Destination dest;
- private MessageProducer producer;
- private MessageType msgType;
- private Message msg;
- private Object payload;
- private List<Object> payloads;
- private boolean cacheMsg = false;
- private boolean randomMsgSize = false;
- private boolean durable = false;
- private Random random;
- private int msgSizeRange = 1024;
- private int totalMsgCount = 0;
- private boolean rateLimitProducer = false;
- private boolean transacted = false;
- private int txSize = 0;
+ private Connection con;
+ private Session session;
+ private Destination dest;
+ private MessageProducer producer;
+ private MessageType msgType;
+ private Message msg;
+ private Object payload;
+ private List<Object> payloads;
+ private boolean cacheMsg = false;
+ private boolean randomMsgSize = false;
+ private boolean durable = false;
+ private Random random;
+ private int msgSizeRange = 1024;
+ private int totalMsgCount = 0;
+ private boolean rateLimitProducer = false;
+ private boolean transacted = false;
+ private int txSize = 0;
- private static final Logger _logger = LoggerFactory.getLogger(QpidSend.class);
- Reporter report;
- TestConfiguration config;
+ private static final Logger _logger = LoggerFactory.getLogger(QpidSend.class);
+ Reporter report;
+ TestConfiguration config;
- public QpidSend(Reporter report, TestConfiguration config, Connection con, Destination dest)
- {
- this(report,config, con, dest, UUID.randomUUID().toString());
- }
+ public QpidSend(Reporter report, TestConfiguration config, Connection con, Destination dest)
+ {
+ this(report,config, con, dest, UUID.randomUUID().toString());
+ }
- public QpidSend(Reporter report, TestConfiguration config, Connection con, Destination dest, String prefix)
- {
- //System.out.println("Producer ID : " + id);
- this.report = report;
- this.config = config;
- this.con = con;
- this.dest = dest;
- }
+ public QpidSend(Reporter report, TestConfiguration config, Connection con, Destination dest, String prefix)
+ {
+ //System.out.println("Producer ID : " + id);
+ this.report = report;
+ this.config = config;
+ this.con = con;
+ this.dest = dest;
+ }
- public void setUp() throws Exception
- {
- if (config.isTransacted())
- {
- session = con.createSession(true, Session.SESSION_TRANSACTED);
- }
- else
- {
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
+ public void setUp() throws Exception
+ {
+ con.start();
+ if (config.isTransacted())
+ {
+ session = con.createSession(true, Session.SESSION_TRANSACTED);
+ }
+ else
+ {
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
- durable = config.isDurable();
- rateLimitProducer = config.getSendRate() > 0 ? true : false;
- if (_logger.isDebugEnabled() && rateLimitProducer)
- {
- System.out.println("The test will attempt to limit the producer to " + config.getSendRate() + " msg/sec");
- }
+ durable = config.isDurable();
+ rateLimitProducer = config.getSendRate() > 0 ? true : false;
+ if (_logger.isDebugEnabled() && rateLimitProducer)
+ {
+ _logger.debug("The test will attempt to limit the producer to " + config.getSendRate() + " msg/sec");
+ }
- transacted = config.isTransacted();
- txSize = config.getTransactionSize();
+ transacted = config.isTransacted();
+ txSize = config.getTransactionSize();
- msgType = MessageType.getType(config.getMessageType());
- // if message caching is enabled we pre create the message
- // else we pre create the payload
- if (config.isCacheMessage())
- {
- cacheMsg = true;
- msg = createMessage(createPayload(config.getMsgSize()));
- msg.setJMSDeliveryMode(durable?
- DeliveryMode.PERSISTENT :
- DeliveryMode.NON_PERSISTENT
- );
- }
- else if (config.isRandomMsgSize())
- {
- random = new Random(20080921);
- randomMsgSize = true;
- msgSizeRange = config.getMsgSize();
- payloads = new ArrayList<Object>(msgSizeRange);
+ msgType = MessageType.getType(config.getMessageType());
+ // if message caching is enabled we pre create the message
+ // else we pre create the payload
+ if (config.isCacheMessage())
+ {
+ cacheMsg = true;
+ msg = createMessage(createPayload(config.getMsgSize()));
+ msg.setJMSDeliveryMode(durable?
+ DeliveryMode.PERSISTENT :
+ DeliveryMode.NON_PERSISTENT
+ );
+ }
+ else if (config.isRandomMsgSize())
+ {
+ random = new Random(20080921);
+ randomMsgSize = true;
+ msgSizeRange = config.getMsgSize();
+ payloads = new ArrayList<Object>(msgSizeRange);
- for (int i=0; i < msgSizeRange; i++)
- {
- payloads.add(createPayload(i));
- }
- }
- else
- {
- payload = createPayload(config.getMsgSize());
- }
+ for (int i=0; i < msgSizeRange; i++)
+ {
+ payloads.add(createPayload(i));
+ }
+ }
+ else
+ {
+ payload = createPayload(config.getMsgSize());
+ }
- producer = session.createProducer(dest);
- if (_logger.isDebugEnabled())
- {
- System.out.println("Producer: " + /*id +*/ " Sending messages to: " + ((AMQDestination)dest).getAddressName());
- }
- producer.setDisableMessageID(config.isDisableMessageID());
- producer.setDisableMessageTimestamp(config.isDisableTimestamp());
- }
+ producer = session.createProducer(dest);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Producer: " + /*id +*/ " Sending messages to: " + ((AMQDestination)dest).getAddressName());
+ }
+ producer.setDisableMessageID(config.isDisableMessageID());
+ //we add a separate timestamp to allow interoperability with other clients.
+ producer.setDisableMessageTimestamp(true);
+ if (config.getTTL() > 0)
+ {
+ producer.setTimeToLive(config.getTTL());
+ }
+ if (config.getPriority() > 0)
+ {
+ producer.setPriority(config.getPriority());
+ }
+ }
- Object createPayload(int size)
- {
- if (msgType == MessageType.TEXT)
- {
- return MessageFactory.createMessagePayload(size);
- }
- else
- {
- return MessageFactory.createMessagePayload(size).getBytes();
- }
- }
+ Object createPayload(int size)
+ {
+ if (msgType == MessageType.TEXT)
+ {
+ return MessageFactory.createMessagePayload(size);
+ }
+ else
+ {
+ return MessageFactory.createMessagePayload(size).getBytes();
+ }
+ }
- Message createMessage(Object payload) throws Exception
- {
- if (msgType == MessageType.TEXT)
- {
- return session.createTextMessage((String)payload);
- }
- else
- {
- BytesMessage m = session.createBytesMessage();
- m.writeBytes((byte[])payload);
- return m;
- }
- }
+ Message createMessage(Object payload) throws Exception
+ {
+ if (msgType == MessageType.TEXT)
+ {
+ return session.createTextMessage((String)payload);
+ }
+ else
+ {
+ BytesMessage m = session.createBytesMessage();
+ m.writeBytes((byte[])payload);
+ return m;
+ }
+ }
- protected Message getNextMessage() throws Exception
- {
- if (cacheMsg)
- {
- return msg;
- }
- else
- {
- Message m;
+ protected Message getNextMessage() throws Exception
+ {
+ if (cacheMsg)
+ {
+ return msg;
+ }
+ else
+ {
+ Message m;
- if (!randomMsgSize)
- {
- m = createMessage(payload);
- }
- else
- {
- m = createMessage(payloads.get(random.nextInt(msgSizeRange)));
- }
- m.setJMSDeliveryMode(durable?
- DeliveryMode.PERSISTENT :
- DeliveryMode.NON_PERSISTENT
- );
- return m;
- }
- }
+ if (!randomMsgSize)
+ {
+ m = createMessage(payload);
+ }
+ else
+ {
+ m = createMessage(payloads.get(random.nextInt(msgSizeRange)));
+ }
+ m.setJMSDeliveryMode(durable?
+ DeliveryMode.PERSISTENT :
+ DeliveryMode.NON_PERSISTENT
+ );
+ return m;
+ }
+ }
- public void commit() throws Exception
- {
- session.commit();
- }
+ public void commit() throws Exception
+ {
+ session.commit();
+ }
- public void send() throws Exception
- {
- send(config.getMsgCount());
- }
+ public void send() throws Exception
+ {
+ send(config.getMsgCount());
+ }
- public void send(int count) throws Exception
- {
- int sendRate = config.getSendRate();
- if (rateLimitProducer)
- {
- int iterations = count/sendRate;
- int remainder = count%sendRate;
- for (int i=0; i < iterations; i++)
- {
- long iterationStart = Clock.getTime();
- sendMessages(sendRate);
- long elapsed = (Clock.getTime() - iterationStart)*Clock.convertToMiliSecs();
- long diff = Clock.SEC - elapsed;
- if (diff > 0)
- {
- // We have sent more messages in a sec than specified by the rate.
- Thread.sleep(diff);
- }
- }
- sendMessages(remainder);
- }
- else
- {
- sendMessages(count);
- }
- }
+ public void send(int count) throws Exception
+ {
+ int sendRate = config.getSendRate();
+ if (rateLimitProducer)
+ {
+ int iterations = count/sendRate;
+ int remainder = count%sendRate;
+ for (int i=0; i < iterations; i++)
+ {
+ long iterationStart = System.currentTimeMillis();
+ sendMessages(sendRate);
+ long elapsed = System.currentTimeMillis() - iterationStart;
+ long diff = Clock.SEC - elapsed;
+ if (diff > 0)
+ {
+ // We have sent more messages in a sec than specified by the rate.
+ Thread.sleep(diff);
+ }
+ }
+ sendMessages(remainder);
+ }
+ else
+ {
+ sendMessages(count);
+ }
+ }
- private void sendMessages(int count) throws Exception
- {
- boolean isTimestamp = config.isReportLatency();
- for(int i=0; i < count; i++ )
- {
- Message msg = getNextMessage();
- if (isTimestamp)
- {
- msg.setLongProperty(TestConfiguration.TIMESTAMP, Clock.getTime());
- }
- producer.send(msg);
- report.message(msg);
- totalMsgCount++;
+ private void sendMessages(int count) throws Exception
+ {
+ boolean isTimestamp = !config.isDisableTimestamp();
+ long s = System.currentTimeMillis();
+ for(int i=0; i < count; i++ )
+ {
+ Message msg = getNextMessage();
+ if (isTimestamp)
+ {
+ msg.setLongProperty(TestConfiguration.TIMESTAMP, System.currentTimeMillis());
+ }
+ producer.send(msg);
+ //report.message(msg);
+ totalMsgCount++;
- if ( transacted && ((totalMsgCount) % txSize == 0))
- {
- session.commit();
- }
- }
- }
+ if ( transacted && ((totalMsgCount) % txSize == 0))
+ {
+ session.commit();
+ }
+ }
+ long e = System.currentTimeMillis() - s;
+ //System.out.println("Rate : " + totalMsgCount/e);
+ }
- public void resetCounters()
- {
- totalMsgCount = 0;
- report.clear();
- }
+ public void resetCounters()
+ {
+ totalMsgCount = 0;
+ report.clear();
+ }
- public void sendEndMessage() throws Exception
- {
- Message msg = session.createMessage();
- msg.setBooleanProperty(TestConfiguration.EOS, true);
- producer.send(msg);
- }
-
- public void tearDown() throws Exception
- {
- session.close();
- }
+ public void sendEndMessage() throws Exception
+ {
+ Message msg = session.createTextMessage(TestConfiguration.EOS);
+ producer.send(msg);
+ }
- public static void main(String[] args) throws Exception
- {
- TestConfiguration config = new JVMArgConfiguration();
- Reporter reporter = new BasicReporter(Throughput.class,
- System.out,
- config.reportEvery(),
- config.isReportHeader()
- );
- Destination dest = AMQDestination.createDestination(config.getAddress());
- QpidSend sender = new QpidSend(reporter,config, config.createConnection(),dest);
- sender.setUp();
- sender.send();
- if (config.getSendEOS() > 0)
- {
- sender.sendEndMessage();
- }
- if (config.isReportTotal())
- {
- reporter.report();
- }
- sender.tearDown();
- }
+ public void tearDown() throws Exception
+ {
+ session.close();
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ TestConfiguration config = new JVMArgConfiguration();
+ Reporter reporter = new BasicReporter(Throughput.class,
+ System.out,
+ config.reportEvery(),
+ config.isReportHeader()
+ );
+ Destination dest = AMQDestination.createDestination(config.getAddress());
+ QpidSend sender = new QpidSend(reporter,config, config.createConnection(),dest);
+ sender.setUp();
+ sender.send();
+ if (config.getSendEOS() > 0)
+ {
+ sender.sendEndMessage();
+ }
+ if (config.isReportTotal())
+ {
+ reporter.report();
+ }
+ sender.tearDown();
+ }
}
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java b/java/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java
index 7f7df0e5e6..18870bac59 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java
@@ -26,20 +26,20 @@ import javax.jms.Connection;
public interface TestConfiguration
{
- enum MessageType {
- BYTES, TEXT, MAP, OBJECT;
-
- public static MessageType getType(String s) throws Exception
- {
- if ("text".equalsIgnoreCase(s))
- {
- return TEXT;
- }
- else if ("bytes".equalsIgnoreCase(s))
- {
- return BYTES;
- }
- /*else if ("map".equalsIgnoreCase(s))
+ enum MessageType {
+ BYTES, TEXT, MAP, OBJECT;
+
+ public static MessageType getType(String s) throws Exception
+ {
+ if ("text".equalsIgnoreCase(s))
+ {
+ return TEXT;
+ }
+ else if ("bytes".equalsIgnoreCase(s))
+ {
+ return BYTES;
+ }
+ /*else if ("map".equalsIgnoreCase(s))
{
return MAP;
}
@@ -47,80 +47,88 @@ public interface TestConfiguration
{
return OBJECT;
}*/
- else
- {
- throw new Exception("Unsupported message type");
- }
- }
- };
+ else
+ {
+ throw new Exception("Unsupported message type");
+ }
+ }
+ };
+
+ public final static String TIMESTAMP = "ts";
+
+ public final static String EOS = "eos";
+
+ public final static String SEQUENCE_NUMBER = "sn";
+
+ public String getUrl();
- public final static String TIMESTAMP = "ts";
+ public String getHost();
- public final static String EOS = "eos";
+ public int getPort();
- public final static String SEQUENCE_NUMBER = "sn";
+ public String getAddress();
- public String getUrl();
+ public long getTimeout();
- public String getHost();
+ public int getAckMode();
- public int getPort();
+ public int getMsgCount();
- public String getAddress();
+ public int getMsgSize();
- public int getAckMode();
+ public int getRandomMsgSizeStartFrom();
- public int getMsgCount();
+ public boolean isDurable();
- public int getMsgSize();
+ public boolean isTransacted();
- public int getRandomMsgSizeStartFrom();
+ public int getTransactionSize();
- public boolean isDurable();
+ public int getWarmupCount();
- public boolean isTransacted();
+ public boolean isCacheMessage();
- public int getTransactionSize();
+ public boolean isDisableMessageID();
- public int getWarmupCount();
+ public boolean isDisableTimestamp();
- public boolean isCacheMessage();
+ public boolean isRandomMsgSize();
- public boolean isDisableMessageID();
+ public String getMessageType();
- public boolean isDisableTimestamp();
+ public boolean isPrintStdDev();
- public boolean isRandomMsgSize();
+ public int getSendRate();
- public String getMessageType();
+ public boolean isExternalController();
- public boolean isPrintStdDev();
+ public boolean isUseUniqueDests();
- public int getSendRate();
+ public int getAckFrequency();
- public boolean isExternalController();
+ public Connection createConnection() throws Exception;
- public boolean isUseUniqueDests();
+ public DecimalFormat getDecimalFormat();
- public int getAckFrequency();
+ public int reportEvery();
- public Connection createConnection() throws Exception;
+ public boolean isReportTotal();
- public DecimalFormat getDecimalFormat();
+ public boolean isReportHeader();
- public int reportEvery();
+ public int getSendEOS();
- public boolean isReportTotal();
+ public int getConnectionCount();
- public boolean isReportHeader();
+ public int getRollbackFrequency();
- public boolean isReportLatency();
+ public boolean isPrintHeaders();
- public int getSendEOS();
+ public boolean isPrintContent();
- public int getConnectionCount();
+ public long getTTL();
- public int getRollbackFrequency();
+ public int getPriority();
- public boolean isPrintHeaders();
+ public String getReadyAddress();
} \ No newline at end of file
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/report/Statistics.java b/java/tools/src/main/java/org/apache/qpid/tools/report/Statistics.java
index 73efd1f1e0..db8b4ddcee 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/report/Statistics.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/report/Statistics.java
@@ -25,115 +25,121 @@ import java.text.DecimalFormat;
import javax.jms.Message;
+import org.apache.qpid.tools.TestConfiguration;
+
public interface Statistics
{
- public void message(Message msg);
- public void report(PrintStream out);
- public void header(PrintStream out);
- public void clear();
-
- static class Throughput implements Statistics
- {
- DecimalFormat df = new DecimalFormat("###.##");
- int messages = 0;
- long start = 0;
- boolean started = false;
-
- @Override
- public void message(Message msg)
- {
- ++messages;
- if (!started)
- {
- start = System.currentTimeMillis();
- started = true;
- }
- }
-
- @Override
- public void report(PrintStream out)
- {
- long elapsed = System.currentTimeMillis() - start;
- out.print(df.format((double)messages/(double)elapsed));
- }
-
- @Override
- public void header(PrintStream out)
- {
- out.print("tp(m/s)");
- }
-
- public void clear()
- {
- messages = 0;
- start = 0;
- started = false;
- }
- }
-
- static class ThroughputAndLatency extends Throughput
- {
- long minLatency = Long.MAX_VALUE;
- long maxLatency = Long.MIN_VALUE;
- double totalLatency = 0;
- int sampleCount = 0;
-
- @Override
- public void message(Message msg)
- {
- super.message(msg);
- try
- {
- long ts = msg.getLongProperty("ts");
- long latency = System.currentTimeMillis() - ts;
- minLatency = Math.min(latency, minLatency);
- maxLatency = Math.min(latency, maxLatency);
- totalLatency = totalLatency + latency;
- sampleCount++;
- }
- catch(Exception e)
- {
- System.out.println("Error calculating latency");
- }
- }
-
- @Override
- public void report(PrintStream out)
- {
- super.report(out);
- double avgLatency = totalLatency/(double)sampleCount;
- out.append('\t')
- .append(String.valueOf(minLatency))
- .append('\t')
- .append(String.valueOf(maxLatency))
- .append('\t')
- .append(df.format(avgLatency));
-
- out.flush();
- }
-
- @Override
- public void header(PrintStream out)
- {
- super.header(out);
- out.append('\t')
- .append("l-min")
- .append('\t')
- .append("l-max")
- .append('\t')
- .append("l-avg");
-
- out.flush();
- }
-
- public void clear()
- {
- super.clear();
- minLatency = 0;
- maxLatency = 0;
- totalLatency = 0;
- sampleCount = 0;
- }
- }
+ public void message(Message msg);
+ public void report(PrintStream out);
+ public void header(PrintStream out);
+ public void clear();
+
+ static class Throughput implements Statistics
+ {
+ DecimalFormat df = new DecimalFormat("###");
+ int messages = 0;
+ long start = 0;
+ boolean started = false;
+
+ @Override
+ public void message(Message msg)
+ {
+ ++messages;
+ if (!started)
+ {
+ start = System.currentTimeMillis();
+ started = true;
+ }
+ }
+
+ @Override
+ public void report(PrintStream out)
+ {
+ long elapsed = System.currentTimeMillis() - start;
+ out.println(df.format((double)messages/(double)elapsed));
+ }
+
+ @Override
+ public void header(PrintStream out)
+ {
+ out.println("tp(m/s)");
+ }
+
+ public void clear()
+ {
+ messages = 0;
+ start = 0;
+ started = false;
+ }
+ }
+
+ static class ThroughputAndLatency extends Throughput
+ {
+ long minLatency = Long.MAX_VALUE;
+ long maxLatency = Long.MIN_VALUE;
+ double totalLatency = 0;
+ int sampleCount = 0;
+
+ @Override
+ public void message(Message msg)
+ {
+ super.message(msg);
+ try
+ {
+ long ts = msg.getLongProperty(TestConfiguration.TIMESTAMP);
+ long latency = System.currentTimeMillis() - ts;
+ minLatency = Math.min(latency, minLatency);
+ maxLatency = Math.max(latency, maxLatency);
+ totalLatency = totalLatency + latency;
+ sampleCount++;
+ }
+ catch(Exception e)
+ {
+ System.out.println("Error calculating latency " + e);
+ }
+ }
+
+ @Override
+ public void report(PrintStream out)
+ {
+ long elapsed = System.currentTimeMillis() - start;
+ double rate = (double)messages/(double)elapsed;
+ double avgLatency = totalLatency/(double)sampleCount;
+ out.append("\n")
+ .append(df.format(rate))
+ .append('\t')
+ .append(String.valueOf(minLatency))
+ .append('\t')
+ .append(String.valueOf(maxLatency))
+ .append('\t')
+ .append(df.format(avgLatency))
+ .append("\n");
+
+ out.flush();
+ }
+
+ @Override
+ public void header(PrintStream out)
+ {
+ out.append("tp(m/s)")
+ .append('\t')
+ .append("l-min")
+ .append('\t')
+ .append("l-max")
+ .append('\t')
+ .append("l-avg");
+
+ out.flush();
+ }
+
+ public void clear()
+ {
+ super.clear();
+ minLatency = 0;
+ maxLatency = 0;
+ totalLatency = 0;
+ sampleCount = 0;
+ }
+ }
}