diff options
Diffstat (limited to 'qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java')
-rw-r--r-- | qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java | 904 |
1 files changed, 904 insertions, 0 deletions
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java new file mode 100644 index 0000000000..602fcc6321 --- /dev/null +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java @@ -0,0 +1,904 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import static org.apache.qpid.tools.QpidBench.Mode.BOTH; +import static org.apache.qpid.tools.QpidBench.Mode.CONSUME; +import static org.apache.qpid.tools.QpidBench.Mode.PUBLISH; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.TextMessage; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.thread.Threading; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.ExchangeBind; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.MessageDeliveryMode; +import org.apache.qpid.transport.MessageFlowMode; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.MessageSubscribe; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.QueueDeclare; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.SessionListener; +import org.apache.qpid.util.UUIDGen; +import org.apache.qpid.util.UUIDs; + +/** + * QpidBench + * + */ + +public class QpidBench +{ + + static enum Mode + { + PUBLISH, CONSUME, BOTH + } + + private static class Options + { + private StringBuilder usage = new StringBuilder("qpid-bench <options>"); + + void usage(String name, String description, Object def) + { + String defval = ""; + if (def != null) + { + defval = String.format(" (%s)", def); + } + usage.append(String.format("\n %-15s%-14s %s", name, defval, description)); + } + + public String broker = "localhost"; + public int port = 5672; + public long count = 1000000; + public long window = 100000; + public long sample = window; + public int size = 1024; + public Mode mode = BOTH; + public boolean timestamp = false; + public boolean message_id = false; + public boolean message_cache = false; + public boolean persistent = false; + public boolean jms_publish = false; + public boolean jms_consume = false; + public boolean help = false; + + { + usage("-b, --broker", "the broker hostname", broker); + } + + public void parse__broker(String b) + { + this.broker = b; + } + + public void parse_b(String b) + { + parse__broker(b); + } + + { + usage("-p, --port", "the broker port", port); + } + + public void parse__port(String p) + { + this.port = Integer.parseInt(p); + } + + public void parse_p(String p) + { + parse__port(p); + } + + { + usage("-c, --count", "the number of messages to send/receive, 0 means no limit", count); + } + + public void parse__count(String c) + { + this.count = Long.parseLong(c); + } + + public void parse_c(String c) + { + parse__count(c); + } + + { + usage("-w, --window", "the number of messages to send before blocking, 0 disables", window); + } + + public void parse__window(String w) + { + this.window = Long.parseLong(w); + } + + public void parse_w(String w) + { + parse__window(w); + } + + { + usage("--sample", "print stats after this many messages, 0 disables", sample); + } + + public void parse__sample(String s) + { + this.sample = Long.parseLong(s); + } + + { + usage("-i, --interval", "sets both --window and --sample", window); + } + + public void parse__interval(String i) + { + this.window = Long.parseLong(i); + this.sample = window; + } + + public void parse_i(String i) + { + parse__interval(i); + } + + { + usage("-s, --size", "the message size", size); + } + + public void parse__size(String s) + { + this.size = Integer.parseInt(s); + } + + public void parse_s(String s) + { + parse__size(s); + } + + { + usage("-m, --mode", "one of publish, consume, or both", mode); + } + + public void parse__mode(String m) + { + if (m.equalsIgnoreCase("publish")) + { + this.mode = PUBLISH; + } + else if (m.equalsIgnoreCase("consume")) + { + this.mode = CONSUME; + } + else if (m.equalsIgnoreCase("both")) + { + this.mode = BOTH; + } + else + { + throw new IllegalArgumentException + ("must be one of 'publish', 'consume', or 'both'"); + } + } + + public void parse_m(String m) + { + parse__mode(m); + } + + { + usage("--timestamp", "set timestamps on each message if true", timestamp); + } + + public void parse__timestamp(String t) + { + this.timestamp = Boolean.parseBoolean(t); + } + + { + usage("--mesage-id", "set the message-id on each message if true", message_id); + } + + public void parse__message_id(String m) + { + this.message_id = Boolean.parseBoolean(m); + } + + { + usage("--message-cache", "reuse the same message for each send if true", message_cache); + } + + public void parse__message_cache(String c) + { + this.message_cache = Boolean.parseBoolean(c); + } + + { + usage("--persistent", "set the delivery-mode to persistent if true", persistent); + } + + public void parse__persistent(String p) + { + this.persistent = Boolean.parseBoolean(p); + } + + { + usage("--jms-publish", "use the jms client for publish", jms_publish); + } + + public void parse__jms_publish(String jp) + { + this.jms_publish = Boolean.parseBoolean(jp); + } + + { + usage("--jms-consume", "use the jms client for consume", jms_consume); + } + + public void parse__jms_consume(String jc) + { + this.jms_consume = Boolean.parseBoolean(jc); + } + + { + usage("--jms", "sets both --jms-publish and --jms-consume", false); + } + + public void parse__jms(String j) + { + this.jms_publish = this.jms_consume = Boolean.parseBoolean(j); + } + + { + usage("-h, --help", "prints this message", null); + } + + public void parse__help() + { + this.help = true; + } + + public void parse_h() + { + parse__help(); + } + + public String parse(String ... args) + { + Class klass = getClass(); + List<String> arguments = new ArrayList<String>(); + for (int i = 0; i < args.length; i++) + { + String option = args[i]; + + if (!option.startsWith("-")) + { + arguments.add(option); + continue; + } + + String method = "parse" + option.replace('-', '_'); + try + { + try + { + Method parser = klass.getMethod(method); + parser.invoke(this); + } + catch (NoSuchMethodException e) + { + try + { + Method parser = klass.getMethod(method, String.class); + + String value = null; + if (i + 1 < args.length) + { + value = args[i+1]; + i++; + } + else + { + return option + " requires a value"; + } + + parser.invoke(this, value); + } + catch (NoSuchMethodException e2) + { + return "no such option: " + option; + } + } + } + catch (InvocationTargetException e) + { + Throwable t = e.getCause(); + return String.format + ("error parsing %s: %s: %s", option, t.getClass().getName(), + t.getMessage()); + } + catch (IllegalAccessException e) + { + throw new RuntimeException + ("unable to access parse method: " + option, e); + } + } + + return parseArguments(arguments); + } + + public String parseArguments(List<String> arguments) + { + if (arguments.size() > 0) + { + String args = arguments.toString(); + return "unrecognized arguments: " + args.substring(1, args.length() - 1); + } + else + { + return null; + } + } + + public String toString() + { + Class klass = getClass(); + Field[] fields = klass.getFields(); + StringBuilder str = new StringBuilder(); + for (int i = 0; i < fields.length; i++) + { + if (i > 0) + { + str.append("\n"); + } + + String name = fields[i].getName(); + str.append(name); + str.append(" = "); + Object value; + try + { + value = fields[i].get(this); + } + catch (IllegalAccessException e) + { + throw new RuntimeException + ("unable to access field: " + name, e); + } + str.append(value); + } + + return str.toString(); + } + } + + public static final void main(String[] args) throws Exception + { + final Options opts = new Options(); + String error = opts.parse(args); + if (error != null) + { + System.err.println(error); + System.exit(-1); + return; + } + + if (opts.help) + { + System.out.println(opts.usage); + return; + } + + System.out.println(opts); + + switch (opts.mode) + { + case CONSUME: + case BOTH: + Runnable r = new Runnable() + { + public void run() + { + try + { + if (opts.jms_consume) + { + jms_consumer(opts); + } + else + { + native_consumer(opts); + } + } + catch (Exception e) + { + throw new RuntimeException(e); + } + System.out.println("Consumer Completed"); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating consumer thread",e); + } + t.start(); + break; + } + + switch (opts.mode) + { + case PUBLISH: + case BOTH: + Runnable r = new Runnable() + { + public void run() + { + try + { + if (opts.jms_publish) + { + jms_publisher(opts); + } + else + { + native_publisher(opts); + } + } + catch (Exception e) + { + throw new RuntimeException(e); + } + System.out.println("Producer Completed"); + } + }; + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating publisher thread",e); + } + t.start(); + break; + } + } + + private static enum Column + { + LEFT, RIGHT + } + + private static final void sample(Options opts, Column col, String name, long count, + long start, long time, long lastTime) + { + String pfx = ""; + String sfx = ""; + if (opts.mode == BOTH) + { + if (col == Column.RIGHT) + { + pfx = " -- "; + } + else + { + sfx = " --"; + } + } + + if (count == 0) + { + String stats = String.format("%s: %tc", name, start); + System.out.println(String.format("%s%-36s%s", pfx, stats, sfx)); + return; + } + + double cumulative = 1000 * (double) count / (double) (time - start); + double interval = 1000 * ((double) opts.sample / (double) (time - lastTime)); + + String stats = String.format + ("%s: %d %.2f %.2f", name, count, cumulative, interval); + System.out.println(String.format("%s%-36s%s", pfx, stats, sfx)); + } + + private static final javax.jms.Connection getJMSConnection(Options opts) throws Exception + { + String url = String.format + ("amqp://guest:guest@clientid/test?brokerlist='tcp://%s:%d'", + opts.broker, opts.port); + return new AMQConnection(url); + } + + private static final void jms_publisher(Options opts) throws Exception + { + javax.jms.Connection conn = getJMSConnection(opts); + + javax.jms.Session ssn = conn.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); + Destination dest = ssn.createQueue("test-queue"); + Destination echo_dest = ssn.createQueue("echo-queue"); + MessageProducer prod = ssn.createProducer(dest); + MessageConsumer cons = ssn.createConsumer(echo_dest); + prod.setDisableMessageID(!opts.message_id); + prod.setDisableMessageTimestamp(!opts.timestamp); + prod.setDeliveryMode(opts.persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + + StringBuilder str = new StringBuilder(); + for (int i = 0; i < opts.size; i++) + { + str.append((char) (i % 128)); + } + + String body = str.toString(); + + TextMessage cached = ssn.createTextMessage(); + cached.setText(body); + + conn.start(); + + long count = 0; + long lastTime = 0; + long start = System.currentTimeMillis(); + while (opts.count == 0 || count < opts.count) + { + if (opts.window > 0 && (count % opts.window) == 0 && count > 0) + { + Message echo = cons.receive(); + } + + if (opts.sample > 0 && (count % opts.sample) == 0) + { + long time = System.currentTimeMillis(); + sample(opts, Column.LEFT, "JP", count, start, time, lastTime); + lastTime = time; + } + + TextMessage m; + if (opts.message_cache) + { + m = cached; + } + else + { + m = ssn.createTextMessage(); + m.setText(body); + } + + prod.send(m); + count++; + } + + conn.close(); + } + + private static final void jms_consumer(final Options opts) throws Exception + { + final javax.jms.Connection conn = getJMSConnection(opts); + javax.jms.Session ssn = conn.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); + Destination dest = ssn.createQueue("test-queue"); + Destination echo_dest = ssn.createQueue("echo-queue"); + MessageConsumer cons = ssn.createConsumer(dest); + final MessageProducer prod = ssn.createProducer(echo_dest); + prod.setDisableMessageID(true); + prod.setDisableMessageTimestamp(true); + prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + final TextMessage echo = ssn.createTextMessage(); + echo.setText("ECHO"); + + final Object done = new Object(); + cons.setMessageListener(new MessageListener() + { + private long count = 0; + private long lastTime = 0; + private long start; + + public void onMessage(Message m) + { + if (count == 0) + { + start = System.currentTimeMillis(); + } + + try + { + boolean sample = opts.sample > 0 && (count % opts.sample) == 0; + long time = sample ? System.currentTimeMillis() : 0; + + if (opts.window > 0 && (count % opts.window) == 0) + { + prod.send(echo); + } + + if (sample) + { + sample(opts, Column.RIGHT, "JC", count, start, time, lastTime); + lastTime = time; + } + } + catch (JMSException e) + { + throw new RuntimeException(e); + } + count++; + + if (opts.count > 0 && count >= opts.count) + { + synchronized (done) + { + done.notify(); + } + } + } + }); + + conn.start(); + synchronized (done) + { + done.wait(); + } + conn.close(); + } + + private static final org.apache.qpid.transport.Connection getConnection + (Options opts) + { + org.apache.qpid.transport.Connection conn = + new org.apache.qpid.transport.Connection(); + conn.connect(opts.broker, opts.port, null, "guest", "guest",false); + return conn; + } + + private static abstract class NativeListener implements SessionListener + { + + public void opened(org.apache.qpid.transport.Session ssn) {} + + public void resumed(org.apache.qpid.transport.Session ssn) {} + + public void exception(org.apache.qpid.transport.Session ssn, + SessionException exc) + { + exc.printStackTrace(); + } + + public void closed(org.apache.qpid.transport.Session ssn) {} + + } + + private static final void native_publisher(Options opts) throws Exception + { + final long[] echos = { 0 }; + org.apache.qpid.transport.Connection conn = getConnection(opts); + org.apache.qpid.transport.Session ssn = conn.createSession(); + ssn.setSessionListener(new NativeListener() + { + public void message(org.apache.qpid.transport.Session ssn, + MessageTransfer xfr) + { + synchronized (echos) + { + echos[0]++; + echos.notify(); + } + ssn.processed(xfr); + } + }); + + ssn.invoke(new QueueDeclare().queue("test-queue").durable(false)); + ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false)); + ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("test-queue").bindingKey("test-queue")); + ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("echo-queue").bindingKey("echo-queue")); + + MessageProperties cached_mp = new MessageProperties(); + DeliveryProperties cached_dp = new DeliveryProperties(); + cached_dp.setRoutingKey("test-queue"); + cached_dp.setDeliveryMode + (opts.persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT); + + int size = opts.size; + ByteBuffer body = ByteBuffer.allocate(size); + for (int i = 0; i < size; i++) + { + body.put((byte) i); + } + body.flip(); + + ssn.invoke(new MessageSubscribe() + .queue("echo-queue") + .destination("echo-queue") + .acceptMode(MessageAcceptMode.NONE) + .acquireMode(MessageAcquireMode.PRE_ACQUIRED)); + ssn.messageSetFlowMode("echo-queue", MessageFlowMode.WINDOW); + ssn.messageFlow("echo-queue", MessageCreditUnit.MESSAGE, 0xFFFFFFFF); + ssn.messageFlow("echo-queue", MessageCreditUnit.BYTE, 0xFFFFFFFF); + + UUIDGen gen = UUIDs.newGenerator(); + + long count = 0; + long lastTime = 0; + long start = System.currentTimeMillis(); + while (opts.count == 0 || count < opts.count) + { + if (opts.window > 0 && (count % opts.window) == 0 && count > 0) + { + synchronized (echos) + { + while (echos[0] < (count/opts.window)) + { + echos.wait(); + } + } + } + + if (opts.sample > 0 && (count % opts.sample) == 0) + { + long time = System.currentTimeMillis(); + sample(opts, Column.LEFT, "NP", count, start, time, lastTime); + lastTime = time; + } + + MessageProperties mp; + DeliveryProperties dp; + if (opts.message_cache) + { + mp = cached_mp; + dp = cached_dp; + } + else + { + mp = new MessageProperties(); + dp = new DeliveryProperties(); + dp.setRoutingKey("test-queue"); + dp.setDeliveryMode + (opts.persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT); + + } + + if (opts.message_id) + { + mp.setMessageId(gen.generate()); + } + + if (opts.timestamp) + { + dp.setTimestamp(System.currentTimeMillis()); + } + + ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, + new Header(dp, mp), body.slice()); + count++; + } + + ssn.messageCancel("echo-queue"); + + ssn.sync(); + ssn.close(); + conn.close(); + } + + private static final void native_consumer(final Options opts) throws Exception + { + final DeliveryProperties dp = new DeliveryProperties(); + final byte[] echo = new byte[0]; + dp.setRoutingKey("echo-queue"); + dp.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); + final MessageProperties mp = new MessageProperties(); + final Object done = new Object(); + org.apache.qpid.transport.Connection conn = getConnection(opts); + org.apache.qpid.transport.Session ssn = conn.createSession(); + ssn.setSessionListener(new NativeListener() + { + private long count = 0; + private long lastTime = 0; + private long start; + + public void message(org.apache.qpid.transport.Session ssn, + MessageTransfer xfr) + { + if (count == 0) + { + start = System.currentTimeMillis(); + } + + boolean sample = opts.sample > 0 && (count % opts.sample) == 0; + long time = sample ? System.currentTimeMillis() : 0; + + if (opts.window > 0 && (count % opts.window) == 0) + { + ssn.messageTransfer("amq.direct", + MessageAcceptMode.NONE, + MessageAcquireMode.PRE_ACQUIRED, + new Header(dp, mp), + echo); + } + + if (sample) + { + sample(opts, Column.RIGHT, "NC", count, start, time, lastTime); + lastTime = time; + } + ssn.processed(xfr); + count++; + + if (opts.count > 0 && count >= opts.count) + { + synchronized (done) + { + done.notify(); + } + } + } + }); + + ssn.invoke(new QueueDeclare().queue("test-queue").durable(false)); + ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false)); + ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("test-queue").bindingKey("test-queue")); + ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("echo-queue").bindingKey("echo-queue")); + + ssn.invoke(new MessageSubscribe() + .queue("test-queue") + .destination("test-queue") + .acceptMode(MessageAcceptMode.NONE) + .acquireMode(MessageAcquireMode.PRE_ACQUIRED)); + ssn.messageSetFlowMode("test-queue", MessageFlowMode.WINDOW); + ssn.messageFlow("test-queue", MessageCreditUnit.MESSAGE, 0xFFFFFFFF); + ssn.messageFlow("test-queue", MessageCreditUnit.BYTE, 0xFFFFFFFF); + + synchronized (done) + { + done.wait(); + } + + ssn.messageCancel("test-queue"); + + ssn.sync(); + ssn.close(); + conn.close(); + } + +} |