diff options
Diffstat (limited to 'trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java')
-rw-r--r-- | trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java | 904 |
1 files changed, 0 insertions, 904 deletions
diff --git a/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java b/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java deleted file mode 100644 index 602fcc6321..0000000000 --- a/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java +++ /dev/null @@ -1,904 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools; - -import static org.apache.qpid.tools.QpidBench.Mode.BOTH; -import static org.apache.qpid.tools.QpidBench.Mode.CONSUME; -import static org.apache.qpid.tools.QpidBench.Mode.PUBLISH; - -import java.lang.reflect.Field; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; - -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.TextMessage; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.thread.Threading; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.ExchangeBind; -import org.apache.qpid.transport.Header; -import org.apache.qpid.transport.MessageAcceptMode; -import org.apache.qpid.transport.MessageAcquireMode; -import org.apache.qpid.transport.MessageCreditUnit; -import org.apache.qpid.transport.MessageDeliveryMode; -import org.apache.qpid.transport.MessageFlowMode; -import org.apache.qpid.transport.MessageProperties; -import org.apache.qpid.transport.MessageSubscribe; -import org.apache.qpid.transport.MessageTransfer; -import org.apache.qpid.transport.QueueDeclare; -import org.apache.qpid.transport.SessionException; -import org.apache.qpid.transport.SessionListener; -import org.apache.qpid.util.UUIDGen; -import org.apache.qpid.util.UUIDs; - -/** - * QpidBench - * - */ - -public class QpidBench -{ - - static enum Mode - { - PUBLISH, CONSUME, BOTH - } - - private static class Options - { - private StringBuilder usage = new StringBuilder("qpid-bench <options>"); - - void usage(String name, String description, Object def) - { - String defval = ""; - if (def != null) - { - defval = String.format(" (%s)", def); - } - usage.append(String.format("\n %-15s%-14s %s", name, defval, description)); - } - - public String broker = "localhost"; - public int port = 5672; - public long count = 1000000; - public long window = 100000; - public long sample = window; - public int size = 1024; - public Mode mode = BOTH; - public boolean timestamp = false; - public boolean message_id = false; - public boolean message_cache = false; - public boolean persistent = false; - public boolean jms_publish = false; - public boolean jms_consume = false; - public boolean help = false; - - { - usage("-b, --broker", "the broker hostname", broker); - } - - public void parse__broker(String b) - { - this.broker = b; - } - - public void parse_b(String b) - { - parse__broker(b); - } - - { - usage("-p, --port", "the broker port", port); - } - - public void parse__port(String p) - { - this.port = Integer.parseInt(p); - } - - public void parse_p(String p) - { - parse__port(p); - } - - { - usage("-c, --count", "the number of messages to send/receive, 0 means no limit", count); - } - - public void parse__count(String c) - { - this.count = Long.parseLong(c); - } - - public void parse_c(String c) - { - parse__count(c); - } - - { - usage("-w, --window", "the number of messages to send before blocking, 0 disables", window); - } - - public void parse__window(String w) - { - this.window = Long.parseLong(w); - } - - public void parse_w(String w) - { - parse__window(w); - } - - { - usage("--sample", "print stats after this many messages, 0 disables", sample); - } - - public void parse__sample(String s) - { - this.sample = Long.parseLong(s); - } - - { - usage("-i, --interval", "sets both --window and --sample", window); - } - - public void parse__interval(String i) - { - this.window = Long.parseLong(i); - this.sample = window; - } - - public void parse_i(String i) - { - parse__interval(i); - } - - { - usage("-s, --size", "the message size", size); - } - - public void parse__size(String s) - { - this.size = Integer.parseInt(s); - } - - public void parse_s(String s) - { - parse__size(s); - } - - { - usage("-m, --mode", "one of publish, consume, or both", mode); - } - - public void parse__mode(String m) - { - if (m.equalsIgnoreCase("publish")) - { - this.mode = PUBLISH; - } - else if (m.equalsIgnoreCase("consume")) - { - this.mode = CONSUME; - } - else if (m.equalsIgnoreCase("both")) - { - this.mode = BOTH; - } - else - { - throw new IllegalArgumentException - ("must be one of 'publish', 'consume', or 'both'"); - } - } - - public void parse_m(String m) - { - parse__mode(m); - } - - { - usage("--timestamp", "set timestamps on each message if true", timestamp); - } - - public void parse__timestamp(String t) - { - this.timestamp = Boolean.parseBoolean(t); - } - - { - usage("--mesage-id", "set the message-id on each message if true", message_id); - } - - public void parse__message_id(String m) - { - this.message_id = Boolean.parseBoolean(m); - } - - { - usage("--message-cache", "reuse the same message for each send if true", message_cache); - } - - public void parse__message_cache(String c) - { - this.message_cache = Boolean.parseBoolean(c); - } - - { - usage("--persistent", "set the delivery-mode to persistent if true", persistent); - } - - public void parse__persistent(String p) - { - this.persistent = Boolean.parseBoolean(p); - } - - { - usage("--jms-publish", "use the jms client for publish", jms_publish); - } - - public void parse__jms_publish(String jp) - { - this.jms_publish = Boolean.parseBoolean(jp); - } - - { - usage("--jms-consume", "use the jms client for consume", jms_consume); - } - - public void parse__jms_consume(String jc) - { - this.jms_consume = Boolean.parseBoolean(jc); - } - - { - usage("--jms", "sets both --jms-publish and --jms-consume", false); - } - - public void parse__jms(String j) - { - this.jms_publish = this.jms_consume = Boolean.parseBoolean(j); - } - - { - usage("-h, --help", "prints this message", null); - } - - public void parse__help() - { - this.help = true; - } - - public void parse_h() - { - parse__help(); - } - - public String parse(String ... args) - { - Class klass = getClass(); - List<String> arguments = new ArrayList<String>(); - for (int i = 0; i < args.length; i++) - { - String option = args[i]; - - if (!option.startsWith("-")) - { - arguments.add(option); - continue; - } - - String method = "parse" + option.replace('-', '_'); - try - { - try - { - Method parser = klass.getMethod(method); - parser.invoke(this); - } - catch (NoSuchMethodException e) - { - try - { - Method parser = klass.getMethod(method, String.class); - - String value = null; - if (i + 1 < args.length) - { - value = args[i+1]; - i++; - } - else - { - return option + " requires a value"; - } - - parser.invoke(this, value); - } - catch (NoSuchMethodException e2) - { - return "no such option: " + option; - } - } - } - catch (InvocationTargetException e) - { - Throwable t = e.getCause(); - return String.format - ("error parsing %s: %s: %s", option, t.getClass().getName(), - t.getMessage()); - } - catch (IllegalAccessException e) - { - throw new RuntimeException - ("unable to access parse method: " + option, e); - } - } - - return parseArguments(arguments); - } - - public String parseArguments(List<String> arguments) - { - if (arguments.size() > 0) - { - String args = arguments.toString(); - return "unrecognized arguments: " + args.substring(1, args.length() - 1); - } - else - { - return null; - } - } - - public String toString() - { - Class klass = getClass(); - Field[] fields = klass.getFields(); - StringBuilder str = new StringBuilder(); - for (int i = 0; i < fields.length; i++) - { - if (i > 0) - { - str.append("\n"); - } - - String name = fields[i].getName(); - str.append(name); - str.append(" = "); - Object value; - try - { - value = fields[i].get(this); - } - catch (IllegalAccessException e) - { - throw new RuntimeException - ("unable to access field: " + name, e); - } - str.append(value); - } - - return str.toString(); - } - } - - public static final void main(String[] args) throws Exception - { - final Options opts = new Options(); - String error = opts.parse(args); - if (error != null) - { - System.err.println(error); - System.exit(-1); - return; - } - - if (opts.help) - { - System.out.println(opts.usage); - return; - } - - System.out.println(opts); - - switch (opts.mode) - { - case CONSUME: - case BOTH: - Runnable r = new Runnable() - { - public void run() - { - try - { - if (opts.jms_consume) - { - jms_consumer(opts); - } - else - { - native_consumer(opts); - } - } - catch (Exception e) - { - throw new RuntimeException(e); - } - System.out.println("Consumer Completed"); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating consumer thread",e); - } - t.start(); - break; - } - - switch (opts.mode) - { - case PUBLISH: - case BOTH: - Runnable r = new Runnable() - { - public void run() - { - try - { - if (opts.jms_publish) - { - jms_publisher(opts); - } - else - { - native_publisher(opts); - } - } - catch (Exception e) - { - throw new RuntimeException(e); - } - System.out.println("Producer Completed"); - } - }; - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating publisher thread",e); - } - t.start(); - break; - } - } - - private static enum Column - { - LEFT, RIGHT - } - - private static final void sample(Options opts, Column col, String name, long count, - long start, long time, long lastTime) - { - String pfx = ""; - String sfx = ""; - if (opts.mode == BOTH) - { - if (col == Column.RIGHT) - { - pfx = " -- "; - } - else - { - sfx = " --"; - } - } - - if (count == 0) - { - String stats = String.format("%s: %tc", name, start); - System.out.println(String.format("%s%-36s%s", pfx, stats, sfx)); - return; - } - - double cumulative = 1000 * (double) count / (double) (time - start); - double interval = 1000 * ((double) opts.sample / (double) (time - lastTime)); - - String stats = String.format - ("%s: %d %.2f %.2f", name, count, cumulative, interval); - System.out.println(String.format("%s%-36s%s", pfx, stats, sfx)); - } - - private static final javax.jms.Connection getJMSConnection(Options opts) throws Exception - { - String url = String.format - ("amqp://guest:guest@clientid/test?brokerlist='tcp://%s:%d'", - opts.broker, opts.port); - return new AMQConnection(url); - } - - private static final void jms_publisher(Options opts) throws Exception - { - javax.jms.Connection conn = getJMSConnection(opts); - - javax.jms.Session ssn = conn.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); - Destination dest = ssn.createQueue("test-queue"); - Destination echo_dest = ssn.createQueue("echo-queue"); - MessageProducer prod = ssn.createProducer(dest); - MessageConsumer cons = ssn.createConsumer(echo_dest); - prod.setDisableMessageID(!opts.message_id); - prod.setDisableMessageTimestamp(!opts.timestamp); - prod.setDeliveryMode(opts.persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); - - StringBuilder str = new StringBuilder(); - for (int i = 0; i < opts.size; i++) - { - str.append((char) (i % 128)); - } - - String body = str.toString(); - - TextMessage cached = ssn.createTextMessage(); - cached.setText(body); - - conn.start(); - - long count = 0; - long lastTime = 0; - long start = System.currentTimeMillis(); - while (opts.count == 0 || count < opts.count) - { - if (opts.window > 0 && (count % opts.window) == 0 && count > 0) - { - Message echo = cons.receive(); - } - - if (opts.sample > 0 && (count % opts.sample) == 0) - { - long time = System.currentTimeMillis(); - sample(opts, Column.LEFT, "JP", count, start, time, lastTime); - lastTime = time; - } - - TextMessage m; - if (opts.message_cache) - { - m = cached; - } - else - { - m = ssn.createTextMessage(); - m.setText(body); - } - - prod.send(m); - count++; - } - - conn.close(); - } - - private static final void jms_consumer(final Options opts) throws Exception - { - final javax.jms.Connection conn = getJMSConnection(opts); - javax.jms.Session ssn = conn.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); - Destination dest = ssn.createQueue("test-queue"); - Destination echo_dest = ssn.createQueue("echo-queue"); - MessageConsumer cons = ssn.createConsumer(dest); - final MessageProducer prod = ssn.createProducer(echo_dest); - prod.setDisableMessageID(true); - prod.setDisableMessageTimestamp(true); - prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - final TextMessage echo = ssn.createTextMessage(); - echo.setText("ECHO"); - - final Object done = new Object(); - cons.setMessageListener(new MessageListener() - { - private long count = 0; - private long lastTime = 0; - private long start; - - public void onMessage(Message m) - { - if (count == 0) - { - start = System.currentTimeMillis(); - } - - try - { - boolean sample = opts.sample > 0 && (count % opts.sample) == 0; - long time = sample ? System.currentTimeMillis() : 0; - - if (opts.window > 0 && (count % opts.window) == 0) - { - prod.send(echo); - } - - if (sample) - { - sample(opts, Column.RIGHT, "JC", count, start, time, lastTime); - lastTime = time; - } - } - catch (JMSException e) - { - throw new RuntimeException(e); - } - count++; - - if (opts.count > 0 && count >= opts.count) - { - synchronized (done) - { - done.notify(); - } - } - } - }); - - conn.start(); - synchronized (done) - { - done.wait(); - } - conn.close(); - } - - private static final org.apache.qpid.transport.Connection getConnection - (Options opts) - { - org.apache.qpid.transport.Connection conn = - new org.apache.qpid.transport.Connection(); - conn.connect(opts.broker, opts.port, null, "guest", "guest",false); - return conn; - } - - private static abstract class NativeListener implements SessionListener - { - - public void opened(org.apache.qpid.transport.Session ssn) {} - - public void resumed(org.apache.qpid.transport.Session ssn) {} - - public void exception(org.apache.qpid.transport.Session ssn, - SessionException exc) - { - exc.printStackTrace(); - } - - public void closed(org.apache.qpid.transport.Session ssn) {} - - } - - private static final void native_publisher(Options opts) throws Exception - { - final long[] echos = { 0 }; - org.apache.qpid.transport.Connection conn = getConnection(opts); - org.apache.qpid.transport.Session ssn = conn.createSession(); - ssn.setSessionListener(new NativeListener() - { - public void message(org.apache.qpid.transport.Session ssn, - MessageTransfer xfr) - { - synchronized (echos) - { - echos[0]++; - echos.notify(); - } - ssn.processed(xfr); - } - }); - - ssn.invoke(new QueueDeclare().queue("test-queue").durable(false)); - ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false)); - ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("test-queue").bindingKey("test-queue")); - ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("echo-queue").bindingKey("echo-queue")); - - MessageProperties cached_mp = new MessageProperties(); - DeliveryProperties cached_dp = new DeliveryProperties(); - cached_dp.setRoutingKey("test-queue"); - cached_dp.setDeliveryMode - (opts.persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT); - - int size = opts.size; - ByteBuffer body = ByteBuffer.allocate(size); - for (int i = 0; i < size; i++) - { - body.put((byte) i); - } - body.flip(); - - ssn.invoke(new MessageSubscribe() - .queue("echo-queue") - .destination("echo-queue") - .acceptMode(MessageAcceptMode.NONE) - .acquireMode(MessageAcquireMode.PRE_ACQUIRED)); - ssn.messageSetFlowMode("echo-queue", MessageFlowMode.WINDOW); - ssn.messageFlow("echo-queue", MessageCreditUnit.MESSAGE, 0xFFFFFFFF); - ssn.messageFlow("echo-queue", MessageCreditUnit.BYTE, 0xFFFFFFFF); - - UUIDGen gen = UUIDs.newGenerator(); - - long count = 0; - long lastTime = 0; - long start = System.currentTimeMillis(); - while (opts.count == 0 || count < opts.count) - { - if (opts.window > 0 && (count % opts.window) == 0 && count > 0) - { - synchronized (echos) - { - while (echos[0] < (count/opts.window)) - { - echos.wait(); - } - } - } - - if (opts.sample > 0 && (count % opts.sample) == 0) - { - long time = System.currentTimeMillis(); - sample(opts, Column.LEFT, "NP", count, start, time, lastTime); - lastTime = time; - } - - MessageProperties mp; - DeliveryProperties dp; - if (opts.message_cache) - { - mp = cached_mp; - dp = cached_dp; - } - else - { - mp = new MessageProperties(); - dp = new DeliveryProperties(); - dp.setRoutingKey("test-queue"); - dp.setDeliveryMode - (opts.persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT); - - } - - if (opts.message_id) - { - mp.setMessageId(gen.generate()); - } - - if (opts.timestamp) - { - dp.setTimestamp(System.currentTimeMillis()); - } - - ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, - new Header(dp, mp), body.slice()); - count++; - } - - ssn.messageCancel("echo-queue"); - - ssn.sync(); - ssn.close(); - conn.close(); - } - - private static final void native_consumer(final Options opts) throws Exception - { - final DeliveryProperties dp = new DeliveryProperties(); - final byte[] echo = new byte[0]; - dp.setRoutingKey("echo-queue"); - dp.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); - final MessageProperties mp = new MessageProperties(); - final Object done = new Object(); - org.apache.qpid.transport.Connection conn = getConnection(opts); - org.apache.qpid.transport.Session ssn = conn.createSession(); - ssn.setSessionListener(new NativeListener() - { - private long count = 0; - private long lastTime = 0; - private long start; - - public void message(org.apache.qpid.transport.Session ssn, - MessageTransfer xfr) - { - if (count == 0) - { - start = System.currentTimeMillis(); - } - - boolean sample = opts.sample > 0 && (count % opts.sample) == 0; - long time = sample ? System.currentTimeMillis() : 0; - - if (opts.window > 0 && (count % opts.window) == 0) - { - ssn.messageTransfer("amq.direct", - MessageAcceptMode.NONE, - MessageAcquireMode.PRE_ACQUIRED, - new Header(dp, mp), - echo); - } - - if (sample) - { - sample(opts, Column.RIGHT, "NC", count, start, time, lastTime); - lastTime = time; - } - ssn.processed(xfr); - count++; - - if (opts.count > 0 && count >= opts.count) - { - synchronized (done) - { - done.notify(); - } - } - } - }); - - ssn.invoke(new QueueDeclare().queue("test-queue").durable(false)); - ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false)); - ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("test-queue").bindingKey("test-queue")); - ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("echo-queue").bindingKey("echo-queue")); - - ssn.invoke(new MessageSubscribe() - .queue("test-queue") - .destination("test-queue") - .acceptMode(MessageAcceptMode.NONE) - .acquireMode(MessageAcquireMode.PRE_ACQUIRED)); - ssn.messageSetFlowMode("test-queue", MessageFlowMode.WINDOW); - ssn.messageFlow("test-queue", MessageCreditUnit.MESSAGE, 0xFFFFFFFF); - ssn.messageFlow("test-queue", MessageCreditUnit.BYTE, 0xFFFFFFFF); - - synchronized (done) - { - done.wait(); - } - - ssn.messageCancel("test-queue"); - - ssn.sync(); - ssn.close(); - conn.close(); - } - -} |