summaryrefslogtreecommitdiff
path: root/qpid/java/tools/src/main/java/org/apache/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/tools/src/main/java/org/apache/qpid')
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/testkit/Client.java154
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java27
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java216
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java197
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java370
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java200
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java349
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java64
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java78
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java267
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java262
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java904
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java170
13 files changed, 3258 insertions, 0 deletions
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Client.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Client.java
new file mode 100644
index 0000000000..b10129d855
--- /dev/null
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Client.java
@@ -0,0 +1,154 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit;
+
+
+import java.text.DateFormat;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.text.SimpleDateFormat;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+public abstract class Client implements ExceptionListener
+{
+ private Connection con;
+ private Session ssn;
+ private boolean durable = false;
+ private boolean transacted = false;
+ private int txSize = 10;
+ private int ack_mode = Session.AUTO_ACKNOWLEDGE;
+ private String contentType = "application/octet-stream";
+
+ private long reportFrequency = 60000; // every min
+
+ private DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
+ private NumberFormat nf = new DecimalFormat("##.00");
+
+ private long startTime = System.currentTimeMillis();
+ private ErrorHandler errorHandler = null;
+
+ public Client(Connection con) throws Exception
+ {
+ this.con = con;
+ this.con.setExceptionListener(this);
+ durable = Boolean.getBoolean("durable");
+ transacted = Boolean.getBoolean("transacted");
+ txSize = Integer.getInteger("tx_size",10);
+ contentType = System.getProperty("content_type","application/octet-stream");
+ reportFrequency = Long.getLong("report_frequency", 60000);
+ }
+
+ public void close()
+ {
+ try
+ {
+ con.close();
+ }
+ catch (Exception e)
+ {
+ handleError("Error closing connection",e);
+ }
+ }
+
+ public void onException(JMSException e)
+ {
+ handleError("Connection error",e);
+ }
+
+ public void setErrorHandler(ErrorHandler h)
+ {
+ this.errorHandler = h;
+ }
+
+ public void handleError(String msg,Exception e)
+ {
+ if (errorHandler != null)
+ {
+ errorHandler.handleError(msg, e);
+ }
+ else
+ {
+ System.err.println(msg);
+ e.printStackTrace();
+ }
+ }
+
+ protected Session getSsn()
+ {
+ return ssn;
+ }
+
+ protected void setSsn(Session ssn)
+ {
+ this.ssn = ssn;
+ }
+
+ protected boolean isDurable()
+ {
+ return durable;
+ }
+
+ protected boolean isTransacted()
+ {
+ return transacted;
+ }
+
+ protected int getTxSize()
+ {
+ return txSize;
+ }
+
+ protected int getAck_mode()
+ {
+ return ack_mode;
+ }
+
+ protected String getContentType()
+ {
+ return contentType;
+ }
+
+ protected long getReportFrequency()
+ {
+ return reportFrequency;
+ }
+
+ protected long getStartTime()
+ {
+ return startTime;
+ }
+
+ protected void setStartTime(long startTime)
+ {
+ this.startTime = startTime;
+ }
+
+ public DateFormat getDf()
+ {
+ return df;
+ }
+
+}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java
new file mode 100644
index 0000000000..dbc73c404f
--- /dev/null
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java
@@ -0,0 +1,27 @@
+package org.apache.qpid.testkit;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+public interface ErrorHandler {
+
+ public void handleError(String msg,Exception e);
+}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java
new file mode 100644
index 0000000000..b4294ee4cc
--- /dev/null
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java
@@ -0,0 +1,216 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.client.AMQAnyDestination;
+import org.apache.qpid.client.AMQConnection;
+
+/**
+ * A generic receiver which consumes messages
+ * from a given address in a broker (host/port)
+ * until told to stop by killing it.
+ *
+ * It participates in a feedback loop to ensure the producer
+ * doesn't fill up the queue. If it receives an "End" msg
+ * it sends a reply to the replyTo address in that msg.
+ *
+ * It doesn't check for correctness or measure anything
+ * leaving those concerns to another entity.
+ * However it prints a timestamp every x secs(-Dreport_frequency)
+ * as checkpoint to figure out how far the test has progressed if
+ * a failure occurred.
+ *
+ * It also takes in an optional Error handler to
+ * pass out any error in addition to writing them to std err.
+ *
+ * This is intended more as building block to create
+ * more complex test cases. However there is a main method
+ * provided to use this standalone.
+ *
+ * The following options are available and configurable
+ * via jvm args.
+ *
+ * sync_rcv - Whether to consume sync (instead of using a listener).
+ * report_frequency - how often a timestamp is printed
+ * durable
+ * transacted
+ * tx_size - size of transaction batch in # msgs. *
+ * check_for_dups - check for duplicate messages and out of order messages.
+ * jms_durable_sub - create a durable subscription instead of a regular subscription.
+ */
+public class Receiver extends Client implements MessageListener
+{
+ long msg_count = 0;
+ int sequence = 0;
+ boolean syncRcv = Boolean.getBoolean("sync_rcv");
+ boolean jmsDurableSub = Boolean.getBoolean("jms_durable_sub");
+ boolean checkForDups = Boolean.getBoolean("check_for_dups");
+ MessageConsumer consumer;
+ List<Integer> duplicateMessages = new ArrayList<Integer>();
+
+ public Receiver(Connection con,String addr) throws Exception
+ {
+ super(con);
+ setSsn(con.createSession(isTransacted(), getAck_mode()));
+ consumer = getSsn().createConsumer(new AMQAnyDestination(addr));
+ if (!syncRcv)
+ {
+ consumer.setMessageListener(this);
+ }
+
+ System.out.println("Receiving messages from : " + addr);
+ }
+
+ public void onMessage(Message msg)
+ {
+ handleMessage(msg);
+ }
+
+ public void run() throws Exception
+ {
+ long sleepTime = getReportFrequency();
+ while(true)
+ {
+ if(syncRcv)
+ {
+ long t = sleepTime;
+ while (t > 0)
+ {
+ long start = System.currentTimeMillis();
+ Message msg = consumer.receive(t);
+ t = t - (System.currentTimeMillis() - start);
+ handleMessage(msg);
+ }
+ }
+ Thread.sleep(sleepTime);
+ System.out.println(getDf().format(System.currentTimeMillis())
+ + " - messages received : " + msg_count);
+ }
+ }
+
+ private void handleMessage(Message m)
+ {
+ if (m == null) { return; }
+
+ try
+ {
+ if (m instanceof TextMessage && ((TextMessage) m).getText().equals("End"))
+ {
+ MessageProducer temp = getSsn().createProducer(m.getJMSReplyTo());
+ Message controlMsg = getSsn().createTextMessage();
+ temp.send(controlMsg);
+ if (isTransacted())
+ {
+ getSsn().commit();
+ }
+ temp.close();
+ }
+ else
+ {
+
+ int seq = m.getIntProperty("sequence");
+ if (checkForDups)
+ {
+ if (seq == 0)
+ {
+ sequence = 0; // wrap around for each iteration
+ System.out.println("Received " + duplicateMessages.size() + " duplicate messages during the iteration");
+ duplicateMessages.clear();
+ }
+
+ if (seq < sequence)
+ {
+ duplicateMessages.add(seq);
+ }
+ else if (seq == sequence)
+ {
+ sequence++;
+ msg_count ++;
+ }
+ else
+ {
+ // Multiple publishers are not allowed in this test case.
+ // So out of order messages are not allowed.
+ throw new Exception(": Received an out of order message (expected="
+ + sequence + ",received=" + seq + ")" );
+ }
+ }
+ else
+ {
+ msg_count ++;
+ }
+
+ // Please note that this test case doesn't expect duplicates
+ // When testing for transactions.
+ if (isTransacted() && msg_count % getTxSize() == 0)
+ {
+ getSsn().commit();
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ handleError("Exception receiving messages",e);
+ }
+ }
+
+ // Receiver host port address
+ public static void main(String[] args) throws Exception
+ {
+ String host = "127.0.0.1";
+ int port = 5672;
+ String addr = "message_queue";
+
+ if (args.length > 0)
+ {
+ host = args[0];
+ }
+ if (args.length > 1)
+ {
+ port = Integer.parseInt(args[1]);
+ }
+ if (args.length > 2)
+ {
+ addr = args[2];
+ }
+
+ AMQConnection con = new AMQConnection(
+ "amqp://username:password@topicClientid/test?brokerlist='tcp://"
+ + host + ":" + port + "'");
+
+ Receiver rcv = new Receiver(con,addr);
+ rcv.run();
+ }
+
+}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java
new file mode 100644
index 0000000000..14b9b7302f
--- /dev/null
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java
@@ -0,0 +1,197 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit;
+
+
+import java.text.DateFormat;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.text.SimpleDateFormat;
+import java.util.Random;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.client.AMQAnyDestination;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.tools.MessageFactory;
+
+/**
+ * A generic sender which sends a stream of messages
+ * to a given address in a broker (host/port)
+ * until told to stop by killing it.
+ *
+ * It has a feedback loop to ensure it doesn't fill
+ * up queues due to a slow consumer.
+ *
+ * It doesn't check for correctness or measure anything
+ * leaving those concerns to another entity.
+ * However it prints a timestamp every x secs(-Dreport_frequency)
+ * as checkpoint to figure out how far the test has progressed if
+ * a failure occurred.
+ *
+ * It also takes in an optional Error handler to
+ * pass out any error in addition to writing them to std err.
+ *
+ * This is intended more as building block to create
+ * more complex test cases. However there is a main method
+ * provided to use this standalone.
+ *
+ * The following options are available and configurable
+ * via jvm args.
+ *
+ * msg_size (256)
+ * msg_count (10) - # messages before waiting for feedback
+ * sleep_time (1000 ms) - sleep time btw each iteration
+ * report_frequency - how often a timestamp is printed
+ * durable
+ * transacted
+ * tx_size - size of transaction batch in # msgs.
+ */
+public class Sender extends Client
+{
+ protected int msg_size = 256;
+ protected int msg_count = 10;
+ protected int iterations = -1;
+ protected long sleep_time = 1000;
+
+ protected Destination dest = null;
+ protected Destination replyTo = null;
+ protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
+ protected NumberFormat nf = new DecimalFormat("##.00");
+
+ protected MessageProducer producer;
+ Random gen = new Random(19770905);
+
+ public Sender(Connection con,String addr) throws Exception
+ {
+ super(con);
+ this.msg_size = Integer.getInteger("msg_size", 100);
+ this.msg_count = Integer.getInteger("msg_count", 10);
+ this.iterations = Integer.getInteger("iterations", -1);
+ this.sleep_time = Long.getLong("sleep_time", 1000);
+ this.setSsn(con.createSession(isTransacted(),Session.AUTO_ACKNOWLEDGE));
+ this.dest = new AMQAnyDestination(addr);
+ this.producer = getSsn().createProducer(dest);
+ this.replyTo = getSsn().createTemporaryQueue();
+
+ System.out.println("Sending messages to : " + addr);
+ }
+
+ /*
+ * If msg_size not specified it generates a message
+ * between 500-1500 bytes.
+ */
+ protected Message getNextMessage() throws Exception
+ {
+ int s = msg_size == -1 ? 500 + gen.nextInt(1000) : msg_size;
+ Message msg = (getContentType().equals("text/plain")) ?
+ MessageFactory.createTextMessage(getSsn(), s):
+ MessageFactory.createBytesMessage(getSsn(), s);
+
+ msg.setJMSDeliveryMode((isDurable()) ? DeliveryMode.PERSISTENT
+ : DeliveryMode.NON_PERSISTENT);
+ return msg;
+ }
+
+ public void run()
+ {
+ try
+ {
+ boolean infinite = (iterations == -1);
+ for (int x=0; infinite || x < iterations; x++)
+ {
+ long now = System.currentTimeMillis();
+ if (now - getStartTime() >= getReportFrequency())
+ {
+ System.out.println(df.format(now) + " - iterations : " + x);
+ setStartTime(now);
+ }
+
+ for (int i = 0; i < msg_count; i++)
+ {
+ Message msg = getNextMessage();
+ msg.setIntProperty("sequence",i);
+ producer.send(msg);
+ if (isTransacted() && msg_count % getTxSize() == 0)
+ {
+ getSsn().commit();
+ }
+ }
+ TextMessage m = getSsn().createTextMessage("End");
+ m.setJMSReplyTo(replyTo);
+ producer.send(m);
+
+ if (isTransacted())
+ {
+ getSsn().commit();
+ }
+
+ MessageConsumer feedbackConsumer = getSsn().createConsumer(replyTo);
+ feedbackConsumer.receive();
+ feedbackConsumer.close();
+ if (isTransacted())
+ {
+ getSsn().commit();
+ }
+ Thread.sleep(sleep_time);
+ }
+ }
+ catch (Exception e)
+ {
+ handleError("Exception sending messages",e);
+ }
+ }
+
+ // Receiver host port address
+ public static void main(String[] args) throws Exception
+ {
+ String host = "127.0.0.1";
+ int port = 5672;
+ String addr = "message_queue";
+
+ if (args.length > 0)
+ {
+ host = args[0];
+ }
+ if (args.length > 1)
+ {
+ port = Integer.parseInt(args[1]);
+ }
+ if (args.length > 2)
+ {
+ addr = args[2];
+ }
+
+ AMQConnection con = new AMQConnection(
+ "amqp://username:password@topicClientid/test?brokerlist='tcp://"
+ + host + ":" + port + "'");
+
+ Sender sender = new Sender(con,addr);
+ sender.run();
+ }
+}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java
new file mode 100644
index 0000000000..72ca48e1c9
--- /dev/null
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java
@@ -0,0 +1,370 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit;
+
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.text.DateFormat;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.apache.qpid.client.AMQAnyDestination;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.thread.Threading;
+
+/**
+ * A basic test case class that could launch a Sender/Receiver
+ * or both, each on it's own separate thread.
+ *
+ * If con_count == ssn_count, then each entity created will have
+ * it's own Connection. Else if con_count < ssn_count, then
+ * a connection will be shared by ssn_count/con_count # of entities.
+ *
+ * The if both sender and receiver options are set, it will
+ * share a connection.
+ *
+ * The following options are available as jvm args
+ * host, port
+ * con_count,ssn_count
+ * con_idle_time - which determines heartbeat
+ * sender, receiver - booleans which indicate which entity to create.
+ * Setting them both is also a valid option.
+ */
+public class TestLauncher implements ErrorHandler
+{
+ protected String host = "127.0.0.1";
+ protected int port = 5672;
+ protected int sessions_per_con = 1;
+ protected int connection_count = 1;
+ protected long heartbeat = 5000;
+ protected boolean sender = false;
+ protected boolean receiver = false;
+ protected boolean useUniqueDests = false;
+ protected String url;
+
+ protected String address = "my_queue; {create: always}";
+ protected boolean durable = false;
+ protected String failover = "";
+ protected AMQConnection controlCon;
+ protected Destination controlDest = null;
+ protected Session controlSession = null;
+ protected MessageProducer statusSender;
+ protected List<AMQConnection> clients = new ArrayList<AMQConnection>();
+ protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
+ protected NumberFormat nf = new DecimalFormat("##.00");
+ protected String testName;
+
+ public TestLauncher()
+ {
+ testName = System.getProperty("test_name","UNKNOWN");
+ host = System.getProperty("host", "127.0.0.1");
+ port = Integer.getInteger("port", 5672);
+ sessions_per_con = Integer.getInteger("ssn_per_con", 1);
+ connection_count = Integer.getInteger("con_count", 1);
+ heartbeat = Long.getLong("heartbeat", 5);
+ sender = Boolean.getBoolean("sender");
+ receiver = Boolean.getBoolean("receiver");
+ useUniqueDests = Boolean.getBoolean("use_unique_dests");
+
+ failover = System.getProperty("failover", "");
+ durable = Boolean.getBoolean("durable");
+
+ url = "amqp://username:password@topicClientid/test?brokerlist='tcp://"
+ + host + ":" + port + "?heartbeat='" + heartbeat+ "''";
+
+ if (failover.equalsIgnoreCase("failover_exchange"))
+ {
+ url += "&failover='failover_exchange'";
+
+ System.out.println("Failover exchange " + url );
+ }
+
+ configureLogging();
+ }
+
+ protected void configureLogging()
+ {
+ PatternLayout layout = new PatternLayout();
+ layout.setConversionPattern("%t %d %p [%c{4}] %m%n");
+ BasicConfigurator.configure(new ConsoleAppender(layout));
+
+ String logLevel = System.getProperty("log.level","warn");
+ String logComponent = System.getProperty("log.comp","org.apache.qpid");
+
+ Logger logger = Logger.getLogger(logComponent);
+ logger.setLevel(Level.toLevel(logLevel, Level.WARN));
+
+ System.out.println("Level " + logger.getLevel());
+
+ }
+
+ public void setUpControlChannel()
+ {
+ try
+ {
+ controlCon = new AMQConnection(url);
+ controlCon.start();
+
+ controlDest = new AMQAnyDestination("control; {create: always}"); // durable
+
+ // Create the session to setup the messages
+ controlSession = controlCon.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ statusSender = controlSession.createProducer(controlDest);
+
+ }
+ catch (Exception e)
+ {
+ handleError("Error while setting up the test",e);
+ }
+ }
+
+ public void cleanup()
+ {
+ try
+ {
+ controlSession.close();
+ controlCon.close();
+ for (AMQConnection con : clients)
+ {
+ con.close();
+ }
+ }
+ catch (Exception e)
+ {
+ handleError("Error while tearing down the test",e);
+ }
+ }
+
+ public void start(String addr)
+ {
+ try
+ {
+ if (addr == null)
+ {
+ addr = address;
+ }
+
+ int ssn_per_con = sessions_per_con;
+ String addrTemp = addr;
+ for (int i = 0; i< connection_count; i++)
+ {
+ AMQConnection con = new AMQConnection(url);
+ con.start();
+ clients.add(con);
+ for (int j = 0; j< ssn_per_con; j++)
+ {
+ String index = createPrefix(i,j);
+ if (useUniqueDests)
+ {
+ addrTemp = modifySubject(index,addr);
+ }
+
+ if (sender)
+ {
+ createSender(index,con,addrTemp,this);
+ }
+
+ if (receiver)
+ {
+ System.out.println("########## Creating receiver ##################");
+
+ createReceiver(index,con,addrTemp,this);
+ }
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ handleError("Exception while setting up the test",e);
+ }
+
+ }
+
+ protected void createReceiver(String index,final AMQConnection con, final String addr, final ErrorHandler h)
+ {
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ Receiver rcv = new Receiver(con,addr);
+ rcv.setErrorHandler(h);
+ rcv.run();
+ }
+ catch (Exception e)
+ {
+ h.handleError("Error Starting Receiver", e);
+ }
+ }
+ };
+
+ Thread t = null;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ handleError("Error creating Receive thread",e);
+ }
+
+ t.setName("ReceiverThread-" + index);
+ t.start();
+ }
+
+ protected void createSender(String index,final AMQConnection con, final String addr, final ErrorHandler h)
+ {
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ Sender sender = new Sender(con, addr);
+ sender.setErrorHandler(h);
+ sender.run();
+ }
+ catch (Exception e)
+ {
+ h.handleError("Error Starting Sender", e);
+ }
+ }
+ };
+
+ Thread t = null;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ handleError("Error creating Sender thread",e);
+ }
+
+ t.setName("SenderThread-" + index);
+ t.start();
+ }
+
+ public synchronized void handleError(String msg,Exception e)
+ {
+ // In case sending the message fails
+ StringBuilder sb = new StringBuilder();
+ sb.append(msg);
+ sb.append(" @ ");
+ sb.append(df.format(new Date(System.currentTimeMillis())));
+ sb.append(" ");
+ sb.append(e.getMessage());
+ System.err.println(sb.toString());
+ e.printStackTrace();
+
+ try
+ {
+ TextMessage errorMsg = controlSession.createTextMessage();
+ errorMsg.setStringProperty("status", "error");
+ errorMsg.setStringProperty("desc", msg);
+ errorMsg.setStringProperty("time", df.format(new Date(System.currentTimeMillis())));
+ errorMsg.setStringProperty("exception-trace", serializeStackTrace(e));
+
+ System.out.println("Msg " + errorMsg);
+
+ statusSender.send(errorMsg);
+ }
+ catch (JMSException e1)
+ {
+ e1.printStackTrace();
+ }
+ }
+
+ private String serializeStackTrace(Exception e)
+ {
+ ByteArrayOutputStream bOut = new ByteArrayOutputStream();
+ PrintStream printStream = new PrintStream(bOut);
+ e.printStackTrace(printStream);
+ printStream.close();
+ return bOut.toString();
+ }
+
+ private String createPrefix(int i, int j)
+ {
+ return String.valueOf(i).concat(String.valueOf(j));
+ }
+
+ /**
+ * A basic helper function to modify the subjects by
+ * appending an index.
+ */
+ private String modifySubject(String index,String addr)
+ {
+ if (addr.indexOf("/") > 0)
+ {
+ addr = addr.substring(0,addr.indexOf("/")+1) +
+ index +
+ addr.substring(addr.indexOf("/")+1,addr.length());
+ }
+ else if (addr.indexOf(";") > 0)
+ {
+ addr = addr.substring(0,addr.indexOf(";")) +
+ "/" + index +
+ addr.substring(addr.indexOf(";"),addr.length());
+ }
+ else
+ {
+ addr = addr + "/" + index;
+ }
+
+ return addr;
+ }
+
+ public static void main(String[] args)
+ {
+ final TestLauncher test = new TestLauncher();
+ test.setUpControlChannel();
+ System.out.println("args.length " + args.length);
+ System.out.println("args [0] " + args [0]);
+ test.start(args.length > 0 ? args [0] : null);
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ public void run() { test.cleanup(); }
+ });
+
+ }
+}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java
new file mode 100644
index 0000000000..2390516ef0
--- /dev/null
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java
@@ -0,0 +1,200 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tools;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.jms.FailoverPolicy;
+
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import java.util.Properties;
+import java.util.Hashtable;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.LinkedList;
+import java.io.IOException;
+import java.io.File;
+import java.io.FileInputStream;
+
+public class JNDICheck
+{
+ private static final String QUEUE = "queue.";
+ private static final String TOPIC = "topic.";
+ private static final String DESTINATION = "destination.";
+ private static final String CONNECTION_FACTORY = "connectionfactory.";
+
+ public static void main(String[] args)
+ {
+
+ if (args.length != 1)
+ {
+ usage();
+ }
+
+ String propertyFile = args[0];
+
+ new JNDICheck(propertyFile);
+ }
+
+ private static void usage()
+ {
+ exit("Usage: JNDICheck <JNDI Config file>", 0);
+ }
+
+ private static void exit(String message, int exitCode)
+ {
+ System.err.println(message);
+ System.exit(exitCode);
+ }
+
+ private static String JAVA_NAMING = "java.naming.factory.initial";
+
+ Context _context = null;
+ Hashtable _environment = null;
+
+ public JNDICheck(String propertyFile)
+ {
+
+ // Load JNDI properties
+ Properties properties = new Properties();
+
+ try
+ {
+ properties.load(new FileInputStream(new File(propertyFile)));
+ }
+ catch (IOException e)
+ {
+ exit("Unable to open property file:" + propertyFile + ". Due to:" + e.getMessage(), 1);
+ }
+
+ //Create the initial context
+ try
+ {
+
+ System.setProperty(JAVA_NAMING, properties.getProperty(JAVA_NAMING));
+
+ _context = new InitialContext(properties);
+
+ _environment = _context.getEnvironment();
+
+ Enumeration keys = _environment.keys();
+
+ List<String> queues = new LinkedList<String>();
+ List<String> topics = new LinkedList<String>();
+ List<String> destinations = new LinkedList<String>();
+ List<String> connectionFactories = new LinkedList<String>();
+
+ while (keys.hasMoreElements())
+ {
+ String key = keys.nextElement().toString();
+
+ if (key.startsWith(QUEUE))
+ {
+ queues.add(key);
+ }
+ else if (key.startsWith(TOPIC))
+ {
+ topics.add(key);
+ }
+ else if (key.startsWith(DESTINATION))
+ {
+ destinations.add(key);
+ }
+ else if (key.startsWith(CONNECTION_FACTORY))
+ {
+ connectionFactories.add(key);
+ }
+ }
+
+ printHeader(propertyFile);
+ printEntries(QUEUE, queues);
+ printEntries(TOPIC, topics);
+ printEntries(DESTINATION, destinations);
+ printEntries(CONNECTION_FACTORY, connectionFactories);
+
+ }
+ catch (NamingException e)
+ {
+ exit("Unable to load JNDI Context due to:" + e.getMessage(), 1);
+ }
+
+ }
+
+ private void printHeader(String file)
+ {
+ print("JNDI file :" + file);
+ }
+
+ private void printEntries(String type, List<String> list)
+ {
+ if (list.size() > 0)
+ {
+ String name = type.substring(0, 1).toUpperCase() + type.substring(1, type.length() - 1);
+ print(name + " elements in file:");
+ printList(list);
+ print("");
+ }
+ }
+
+ private void printList(List<String> list)
+ {
+ for (String item : list)
+ {
+ String key = item.substring(item.indexOf('.') + 1);
+
+ try
+ {
+ print(key, _context.lookup(key));
+ }
+ catch (NamingException e)
+ {
+ exit("Error: item " + key + " no longer in context.", 1);
+ }
+ }
+ }
+
+ private void print(String key, Object object)
+ {
+ if (object instanceof AMQDestination)
+ {
+ print(key + ":" + object);
+ }
+ else if (object instanceof AMQConnectionFactory)
+ {
+ AMQConnectionFactory factory = (AMQConnectionFactory) object;
+ print(key + ":Connection");
+ print("ConnectionURL:");
+ print(factory.getConnectionURL().toString());
+ print("FailoverPolicy");
+ print(new FailoverPolicy(factory.getConnectionURL(),null).toString());
+ print("");
+ }
+ }
+
+ private void print(String msg)
+ {
+ System.out.println(msg);
+ }
+
+}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java
new file mode 100644
index 0000000000..b88b242e6d
--- /dev/null
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java
@@ -0,0 +1,349 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tools;
+
+import java.io.FileOutputStream;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.thread.Threading;
+
+/**
+ * Latency test sends an x number of messages in warmup mode and wait for a confirmation
+ * from the consumer that it has successfully consumed them and ready to start the
+ * test. It will start sending y number of messages and each message will contain a time
+ * stamp. This will be used at the receiving end to measure the latency.
+ *
+ * It is important to have a sufficiently large number for the warmup count to
+ * ensure the system is in steady state before the test is started.
+ *
+ * If you plan to plot the latencies then msg_count should be a smaller number (ex 500 or 1000)
+ * You also need to specify a file name using -Dfile=/home/rajith/latency.log.1
+ *
+ * The idea is to get a latency sample for the system once it achieves steady state.
+ *
+ */
+
+public class LatencyTest extends PerfBase implements MessageListener
+{
+ MessageProducer producer;
+ MessageConsumer consumer;
+ Message msg;
+ byte[] payload;
+ long maxLatency = 0;
+ long minLatency = Long.MAX_VALUE;
+ long totalLatency = 0; // to calculate avg latency.
+ int rcvdMsgCount = 0;
+ double stdDev = 0;
+ double avgLatency = 0;
+ boolean warmup_mode = true;
+ boolean transacted = false;
+ int transSize = 0;
+
+ final List<Long> latencies;
+ final Lock lock = new ReentrantLock();
+ final Condition warmedUp;
+ final Condition testCompleted;
+
+ public LatencyTest()
+ {
+ super();
+ warmedUp = lock.newCondition();
+ testCompleted = lock.newCondition();
+ // Storing the following two for efficiency
+ transacted = params.isTransacted();
+ transSize = params.getTransactionSize();
+ latencies = new ArrayList <Long>(params.getMsgCount());
+ }
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ consumer = session.createConsumer(dest);
+ consumer.setMessageListener(this);
+
+ // if message caching is enabled we pre create the message
+ // else we pre create the payload
+ if (params.isCacheMessage())
+ {
+ msg = MessageFactory.createBytesMessage(session, params.getMsgSize());
+ msg.setJMSDeliveryMode(params.isDurable()?
+ DeliveryMode.PERSISTENT :
+ DeliveryMode.NON_PERSISTENT
+ );
+ }
+ else
+ {
+ payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes();
+ }
+
+ producer = session.createProducer(dest);
+ producer.setDisableMessageID(params.isDisableMessageID());
+ producer.setDisableMessageTimestamp(params.isDisableTimestamp());
+ }
+
+ protected Message getNextMessage() throws Exception
+ {
+ if (params.isCacheMessage())
+ {
+ return msg;
+ }
+ else
+ {
+ msg = session.createBytesMessage();
+ ((BytesMessage)msg).writeBytes(payload);
+ return msg;
+ }
+ }
+
+ public void warmup()throws Exception
+ {
+ System.out.println("Warming up......");
+ int count = params.getWarmupCount();
+ for (int i=0; i < count; i++)
+ {
+ producer.send(getNextMessage());
+ }
+ Message msg = session.createTextMessage("End");
+ producer.send(msg);
+
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+
+ try
+ {
+ lock.lock();
+ warmedUp.await();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ public void onMessage(Message msg)
+ {
+ try
+ {
+ if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End"))
+ {
+ if (warmup_mode)
+ {
+ warmup_mode = false;
+ try
+ {
+ lock.lock();
+ warmedUp.signal();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+ else
+ {
+ computeStats();
+ }
+ }
+ else if (!warmup_mode)
+ {
+ long time = System.currentTimeMillis();
+ rcvdMsgCount ++;
+
+ if (transacted && (rcvdMsgCount % transSize == 0))
+ {
+ session.commit();
+ }
+
+ long latency = time - msg.getJMSTimestamp();
+ latencies.add(latency);
+ totalLatency = totalLatency + latency;
+ }
+
+ }
+ catch(Exception e)
+ {
+ handleError(e,"Error when receiving messages");
+ }
+
+ }
+
+ private void computeStats()
+ {
+ avgLatency = (double)totalLatency/(double)rcvdMsgCount;
+ double sigma = 0;
+
+ for (long latency: latencies)
+ {
+ maxLatency = Math.max(maxLatency, latency);
+ minLatency = Math.min(minLatency, latency);
+ sigma = sigma + Math.pow(latency - avgLatency,2);
+ }
+
+ stdDev = Math.sqrt(sigma/(rcvdMsgCount -1));
+
+ try
+ {
+ lock.lock();
+ testCompleted.signal();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ public void writeToFile() throws Exception
+ {
+ String fileName = System.getProperty("file");
+ PrintWriter writer = new PrintWriter(new FileOutputStream(fileName));
+ for (long latency: latencies)
+ {
+ writer.println(String.valueOf(latency));
+ }
+ writer.flush();
+ writer.close();
+ }
+
+ public void printToConsole()
+ {
+ System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString());
+ System.out.println(new StringBuilder("Standard Deviation : ").
+ append(df.format(stdDev)).
+ append(" ms").toString());
+ System.out.println(new StringBuilder("Avg Latency : ").
+ append(df.format(avgLatency)).
+ append(" ms").toString());
+ System.out.println(new StringBuilder("Min Latency : ").
+ append(minLatency).
+ append(" ms").toString());
+ System.out.println(new StringBuilder("Max Latency : ").
+ append(maxLatency).
+ append(" ms").toString());
+ System.out.println("Completed the test......\n");
+ }
+
+ public void startTest() throws Exception
+ {
+ System.out.println("Starting test......");
+ int count = params.getMsgCount();
+
+ for(int i=0; i < count; i++ )
+ {
+ Message msg = getNextMessage();
+ msg.setJMSTimestamp(System.currentTimeMillis());
+ producer.send(msg);
+ if ( transacted && ((i+1) % transSize == 0))
+ {
+ session.commit();
+ }
+ }
+ Message msg = session.createTextMessage("End");
+ producer.send(msg);
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+ }
+
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ lock.lock();
+ testCompleted.await();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+
+ producer.close();
+ consumer.close();
+ session.close();
+ con.close();
+ }
+
+ public void test()
+ {
+ try
+ {
+ setUp();
+ warmup();
+ startTest();
+ tearDown();
+ }
+ catch(Exception e)
+ {
+ handleError(e,"Error when running test");
+ }
+ }
+
+
+ public static void main(String[] args)
+ {
+ final LatencyTest latencyTest = new LatencyTest();
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ latencyTest.test();
+ latencyTest.printToConsole();
+ if (System.getProperty("file") != null)
+ {
+ try
+ {
+ latencyTest.writeToFile();
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating latency test thread",e);
+ }
+ t.start();
+ }
+} \ No newline at end of file
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java
new file mode 100644
index 0000000000..8ab1379fce
--- /dev/null
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java
@@ -0,0 +1,64 @@
+package org.apache.qpid.tools;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+public class MessageFactory
+{
+ public static Message createBytesMessage(Session ssn, int size) throws JMSException
+ {
+ BytesMessage msg = ssn.createBytesMessage();
+ msg.writeBytes(createMessagePayload(size).getBytes());
+ return msg;
+ }
+
+ public static Message createTextMessage(Session ssn, int size) throws JMSException
+ {
+ TextMessage msg = ssn.createTextMessage();
+ msg.setText(createMessagePayload(size));
+ return msg;
+ }
+
+ public static String createMessagePayload(int size)
+ {
+ String msgData = "Qpid Test Message";
+
+ StringBuffer buf = new StringBuffer(size);
+ int count = 0;
+ while (count <= (size - msgData.length()))
+ {
+ buf.append(msgData);
+ count += msgData.length();
+ }
+ if (count < size)
+ {
+ buf.append(msgData, 0, size - count);
+ }
+
+ return buf.toString();
+ }
+}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
new file mode 100644
index 0000000000..ac597d17de
--- /dev/null
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tools;
+
+import java.text.DecimalFormat;
+import java.util.Hashtable;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+
+import org.apache.qpid.client.AMQAnyDestination;
+import org.apache.qpid.client.AMQConnection;
+
+public class PerfBase
+{
+ TestParams params;
+ Connection con;
+ Session session;
+ Destination dest;
+ Destination feedbackDest;
+ DecimalFormat df = new DecimalFormat("###.##");
+
+ public PerfBase()
+ {
+ params = new TestParams();
+ }
+
+ public void setUp() throws Exception
+ {
+
+ if (params.getHost().equals("") || params.getPort() == -1)
+ {
+ con = new AMQConnection(params.getUrl());
+ }
+ else
+ {
+ con = new AMQConnection(params.getHost(),params.getPort(),"guest","guest","test","test");
+ }
+ con.start();
+ session = con.createSession(params.isTransacted(),
+ params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode());
+
+ dest = new AMQAnyDestination(params.getAddress());
+ }
+
+ public void handleError(Exception e,String msg)
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(msg);
+ sb.append(" ");
+ sb.append(e.getMessage());
+ System.err.println(sb.toString());
+ e.printStackTrace();
+ }
+}
+
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
new file mode 100644
index 0000000000..0ef0455a64
--- /dev/null
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
@@ -0,0 +1,267 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tools;
+
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.thread.Threading;
+
+/**
+ * PerfConsumer will receive x no of messages in warmup mode.
+ * Once it receives the Start message it will then signal the PerfProducer.
+ * It will start recording stats from the first message it receives after
+ * the warmup mode is done.
+ *
+ * The following calculations are done.
+ * The important numbers to look at is
+ * a) Avg Latency
+ * b) System throughput.
+ *
+ * Latency.
+ * =========
+ * Currently this test is written with the assumption that either
+ * a) The Perf Producer and Consumer are on the same machine
+ * b) They are on separate machines that have their time synced via a Time Server
+ *
+ * In order to calculate latency the producer inserts a timestamp
+ * hen the message is sent. The consumer will note the current time the message is
+ * received and will calculate the latency as follows
+ * latency = rcvdTime - msg.getJMSTimestamp()
+ *
+ * Through out the test it will keep track of the max and min latency to show the
+ * variance in latencies.
+ *
+ * Avg latency is measured by adding all latencies and dividing by the total msgs.
+ * You can also compute this by (rcvdTime - testStartTime)/rcvdMsgCount
+ *
+ * Throughput
+ * ===========
+ * System throughput is calculated as follows
+ * rcvdMsgCount/(rcvdTime - testStartTime)
+ *
+ * Consumer rate is calculated as
+ * rcvdMsgCount/(rcvdTime - startTime)
+ *
+ * Note that the testStartTime referes to when the producer sent the first message
+ * and startTime is when the consumer first received a message.
+ *
+ * rcvdTime keeps track of when the last message is received.
+ *
+ * All throughput rates are given as msg/sec so the rates are multiplied by 1000.
+ *
+ */
+
+public class PerfConsumer extends PerfBase implements MessageListener
+{
+ MessageConsumer consumer;
+ long maxLatency = 0;
+ long minLatency = Long.MAX_VALUE;
+ long totalLatency = 0; // to calculate avg latency.
+ int rcvdMsgCount = 0;
+ long testStartTime = 0; // to measure system throughput
+ long startTime = 0; // to measure consumer throughput
+ long rcvdTime = 0;
+ boolean transacted = false;
+ int transSize = 0;
+
+ final Object lock = new Object();
+
+ public PerfConsumer()
+ {
+ super();
+ }
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ consumer = session.createConsumer(dest);
+
+ // Storing the following two for efficiency
+ transacted = params.isTransacted();
+ transSize = params.getTransactionSize();
+ }
+
+ public void warmup()throws Exception
+ {
+ System.out.println("Warming up......");
+
+ boolean start = false;
+ while (!start)
+ {
+ Message msg = consumer.receive();
+ if (msg instanceof TextMessage)
+ {
+ if (((TextMessage)msg).getText().equals("End"))
+ {
+ start = true;
+ MessageProducer temp = session.createProducer(msg.getJMSReplyTo());
+ temp.send(session.createMessage());
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+ temp.close();
+ }
+ }
+ }
+ }
+
+ public void startTest() throws Exception
+ {
+ System.out.println("Starting test......");
+ consumer.setMessageListener(this);
+ }
+
+ public void printResults() throws Exception
+ {
+ synchronized (lock)
+ {
+ lock.wait();
+ }
+
+ double avgLatency = (double)totalLatency/(double)rcvdMsgCount;
+ double throughput = ((double)rcvdMsgCount/(double)(rcvdTime - testStartTime))*1000;
+ double consRate = ((double)rcvdMsgCount/(double)(rcvdTime - startTime))*1000;
+ System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString());
+ System.out.println(new StringBuilder("Consumer rate : ").
+ append(df.format(consRate)).
+ append(" msg/sec").toString());
+ System.out.println(new StringBuilder("System Throughput : ").
+ append(df.format(throughput)).
+ append(" msg/sec").toString());
+ System.out.println(new StringBuilder("Avg Latency : ").
+ append(df.format(avgLatency)).
+ append(" ms").toString());
+ System.out.println(new StringBuilder("Min Latency : ").
+ append(minLatency).
+ append(" ms").toString());
+ System.out.println(new StringBuilder("Max Latency : ").
+ append(maxLatency).
+ append(" ms").toString());
+ System.out.println("Completed the test......\n");
+ }
+
+ public void notifyCompletion(Destination replyTo) throws Exception
+ {
+ MessageProducer tmp = session.createProducer(replyTo);
+ Message endMsg = session.createMessage();
+ tmp.send(endMsg);
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+ tmp.close();
+ }
+
+ public void tearDown() throws Exception
+ {
+ consumer.close();
+ session.close();
+ con.close();
+ }
+
+ public void onMessage(Message msg)
+ {
+ try
+ {
+ if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End"))
+ {
+ notifyCompletion(msg.getJMSReplyTo());
+
+ synchronized (lock)
+ {
+ lock.notifyAll();
+ }
+ }
+ else
+ {
+ rcvdTime = System.currentTimeMillis();
+ rcvdMsgCount ++;
+
+ if (rcvdMsgCount == 1)
+ {
+ startTime = rcvdTime;
+ testStartTime = msg.getJMSTimestamp();
+ }
+
+ if (transacted && (rcvdMsgCount % transSize == 0))
+ {
+ session.commit();
+ }
+
+ long latency = rcvdTime - msg.getJMSTimestamp();
+ maxLatency = Math.max(maxLatency, latency);
+ minLatency = Math.min(minLatency, latency);
+ totalLatency = totalLatency + latency;
+ }
+
+ }
+ catch(Exception e)
+ {
+ handleError(e,"Error when receiving messages");
+ }
+
+ }
+
+ public void test()
+ {
+ try
+ {
+ setUp();
+ warmup();
+ startTest();
+ printResults();
+ tearDown();
+ }
+ catch(Exception e)
+ {
+ handleError(e,"Error when running test");
+ }
+ }
+
+ public static void main(String[] args)
+ {
+ final PerfConsumer cons = new PerfConsumer();
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ cons.test();
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating consumer thread",e);
+ }
+ t.start();
+ }
+} \ No newline at end of file
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
new file mode 100644
index 0000000000..015d1e6205
--- /dev/null
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
@@ -0,0 +1,262 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tools;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+
+import org.apache.qpid.thread.Threading;
+
+/**
+ * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation
+ * from the consumer that it has successfully consumed them and ready to start the
+ * test. It will start sending y no of messages and each message will contain a time
+ * stamp. This will be used at the receiving end to measure the latency.
+ *
+ * This is done with the assumption that both consumer and producer are running on
+ * the same machine or different machines which have time synced using a time server.
+ *
+ * This test also calculates the producer rate as follows.
+ * rate = msg_count/(time_before_sending_msgs - time_after_sending_msgs)
+ *
+ * All throughput rates are given as msg/sec so the rates are multiplied by 1000.
+ *
+ * Rajith - Producer rate is not an accurate perf metric IMO.
+ * It is heavily inlfuenced by any in memory buffering.
+ * System throughput and latencies calculated by the PerfConsumer are more realistic
+ * numbers.
+ *
+ */
+public class PerfProducer extends PerfBase
+{
+ MessageProducer producer;
+ Message msg;
+ byte[] payload;
+ List<byte[]> payloads;
+ boolean cacheMsg = false;
+ boolean randomMsgSize = false;
+ boolean durable = false;
+ Random random;
+ int msgSizeRange = 1024;
+
+ public PerfProducer()
+ {
+ super();
+ }
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ feedbackDest = session.createTemporaryQueue();
+
+ durable = params.isDurable();
+
+ // if message caching is enabled we pre create the message
+ // else we pre create the payload
+ if (params.isCacheMessage())
+ {
+ cacheMsg = true;
+
+ msg = MessageFactory.createBytesMessage(session, params.getMsgSize());
+ msg.setJMSDeliveryMode(durable?
+ DeliveryMode.PERSISTENT :
+ DeliveryMode.NON_PERSISTENT
+ );
+ }
+ else if (params.isRandomMsgSize())
+ {
+ random = new Random(20080921);
+ randomMsgSize = true;
+ msgSizeRange = params.getMsgSize();
+ payloads = new ArrayList<byte[]>(msgSizeRange);
+
+ for (int i=0; i < msgSizeRange; i++)
+ {
+ payloads.add(MessageFactory.createMessagePayload(i).getBytes());
+ }
+ }
+ else
+ {
+ payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes();
+ }
+
+ producer = session.createProducer(dest);
+ producer.setDisableMessageID(params.isDisableMessageID());
+ producer.setDisableMessageTimestamp(params.isDisableTimestamp());
+ }
+
+ protected Message getNextMessage() throws Exception
+ {
+ if (cacheMsg)
+ {
+ return msg;
+ }
+ else
+ {
+ msg = session.createBytesMessage();
+
+ if (!randomMsgSize)
+ {
+ ((BytesMessage)msg).writeBytes(payload);
+ }
+ else
+ {
+ ((BytesMessage)msg).writeBytes(payloads.get(random.nextInt(msgSizeRange)));
+ }
+ msg.setJMSDeliveryMode(durable?
+ DeliveryMode.PERSISTENT :
+ DeliveryMode.NON_PERSISTENT
+ );
+ return msg;
+ }
+ }
+
+ public void warmup()throws Exception
+ {
+ System.out.println("Warming up......");
+ MessageConsumer tmp = session.createConsumer(feedbackDest);
+
+ for (int i=0; i < params.getWarmupCount() -1; i++)
+ {
+ producer.send(getNextMessage());
+ }
+ Message msg = session.createTextMessage("End");
+ msg.setJMSReplyTo(feedbackDest);
+ producer.send(msg);
+
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+
+ tmp.receive();
+
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+
+ tmp.close();
+ }
+
+ public void startTest() throws Exception
+ {
+ System.out.println("Starting test......");
+ int count = params.getMsgCount();
+ boolean transacted = params.isTransacted();
+ int tranSize = params.getTransactionSize();
+
+ long start = System.currentTimeMillis();
+ for(int i=0; i < count; i++ )
+ {
+ Message msg = getNextMessage();
+ msg.setJMSTimestamp(System.currentTimeMillis());
+ producer.send(msg);
+ if ( transacted && ((i+1) % tranSize == 0))
+ {
+ session.commit();
+ }
+ }
+ long time = System.currentTimeMillis() - start;
+ double rate = ((double)count/(double)time)*1000;
+ System.out.println(new StringBuilder("Producer rate: ").
+ append(df.format(rate)).
+ append(" msg/sec").
+ toString());
+ }
+
+ public void waitForCompletion() throws Exception
+ {
+ MessageConsumer tmp = session.createConsumer(feedbackDest);
+ Message msg = session.createTextMessage("End");
+ msg.setJMSReplyTo(feedbackDest);
+ producer.send(msg);
+
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+
+ tmp.receive();
+
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+
+ tmp.close();
+ System.out.println("Consumer has completed the test......");
+ }
+
+ public void tearDown() throws Exception
+ {
+ producer.close();
+ session.close();
+ con.close();
+ }
+
+ public void test()
+ {
+ try
+ {
+ setUp();
+ warmup();
+ startTest();
+ waitForCompletion();
+ tearDown();
+ }
+ catch(Exception e)
+ {
+ handleError(e,"Error when running test");
+ }
+ }
+
+
+ public static void main(String[] args)
+ {
+ final PerfProducer prod = new PerfProducer();
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ prod.test();
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating producer thread",e);
+ }
+ t.start();
+ }
+} \ No newline at end of file
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
new file mode 100644
index 0000000000..602fcc6321
--- /dev/null
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
@@ -0,0 +1,904 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tools;
+
+import static org.apache.qpid.tools.QpidBench.Mode.BOTH;
+import static org.apache.qpid.tools.QpidBench.Mode.CONSUME;
+import static org.apache.qpid.tools.QpidBench.Mode.PUBLISH;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.thread.Threading;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.ExchangeBind;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.MessageDeliveryMode;
+import org.apache.qpid.transport.MessageFlowMode;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.MessageSubscribe;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.QueueDeclare;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.SessionListener;
+import org.apache.qpid.util.UUIDGen;
+import org.apache.qpid.util.UUIDs;
+
+/**
+ * QpidBench
+ *
+ */
+
+public class QpidBench
+{
+
+ static enum Mode
+ {
+ PUBLISH, CONSUME, BOTH
+ }
+
+ private static class Options
+ {
+ private StringBuilder usage = new StringBuilder("qpid-bench <options>");
+
+ void usage(String name, String description, Object def)
+ {
+ String defval = "";
+ if (def != null)
+ {
+ defval = String.format(" (%s)", def);
+ }
+ usage.append(String.format("\n %-15s%-14s %s", name, defval, description));
+ }
+
+ public String broker = "localhost";
+ public int port = 5672;
+ public long count = 1000000;
+ public long window = 100000;
+ public long sample = window;
+ public int size = 1024;
+ public Mode mode = BOTH;
+ public boolean timestamp = false;
+ public boolean message_id = false;
+ public boolean message_cache = false;
+ public boolean persistent = false;
+ public boolean jms_publish = false;
+ public boolean jms_consume = false;
+ public boolean help = false;
+
+ {
+ usage("-b, --broker", "the broker hostname", broker);
+ }
+
+ public void parse__broker(String b)
+ {
+ this.broker = b;
+ }
+
+ public void parse_b(String b)
+ {
+ parse__broker(b);
+ }
+
+ {
+ usage("-p, --port", "the broker port", port);
+ }
+
+ public void parse__port(String p)
+ {
+ this.port = Integer.parseInt(p);
+ }
+
+ public void parse_p(String p)
+ {
+ parse__port(p);
+ }
+
+ {
+ usage("-c, --count", "the number of messages to send/receive, 0 means no limit", count);
+ }
+
+ public void parse__count(String c)
+ {
+ this.count = Long.parseLong(c);
+ }
+
+ public void parse_c(String c)
+ {
+ parse__count(c);
+ }
+
+ {
+ usage("-w, --window", "the number of messages to send before blocking, 0 disables", window);
+ }
+
+ public void parse__window(String w)
+ {
+ this.window = Long.parseLong(w);
+ }
+
+ public void parse_w(String w)
+ {
+ parse__window(w);
+ }
+
+ {
+ usage("--sample", "print stats after this many messages, 0 disables", sample);
+ }
+
+ public void parse__sample(String s)
+ {
+ this.sample = Long.parseLong(s);
+ }
+
+ {
+ usage("-i, --interval", "sets both --window and --sample", window);
+ }
+
+ public void parse__interval(String i)
+ {
+ this.window = Long.parseLong(i);
+ this.sample = window;
+ }
+
+ public void parse_i(String i)
+ {
+ parse__interval(i);
+ }
+
+ {
+ usage("-s, --size", "the message size", size);
+ }
+
+ public void parse__size(String s)
+ {
+ this.size = Integer.parseInt(s);
+ }
+
+ public void parse_s(String s)
+ {
+ parse__size(s);
+ }
+
+ {
+ usage("-m, --mode", "one of publish, consume, or both", mode);
+ }
+
+ public void parse__mode(String m)
+ {
+ if (m.equalsIgnoreCase("publish"))
+ {
+ this.mode = PUBLISH;
+ }
+ else if (m.equalsIgnoreCase("consume"))
+ {
+ this.mode = CONSUME;
+ }
+ else if (m.equalsIgnoreCase("both"))
+ {
+ this.mode = BOTH;
+ }
+ else
+ {
+ throw new IllegalArgumentException
+ ("must be one of 'publish', 'consume', or 'both'");
+ }
+ }
+
+ public void parse_m(String m)
+ {
+ parse__mode(m);
+ }
+
+ {
+ usage("--timestamp", "set timestamps on each message if true", timestamp);
+ }
+
+ public void parse__timestamp(String t)
+ {
+ this.timestamp = Boolean.parseBoolean(t);
+ }
+
+ {
+ usage("--mesage-id", "set the message-id on each message if true", message_id);
+ }
+
+ public void parse__message_id(String m)
+ {
+ this.message_id = Boolean.parseBoolean(m);
+ }
+
+ {
+ usage("--message-cache", "reuse the same message for each send if true", message_cache);
+ }
+
+ public void parse__message_cache(String c)
+ {
+ this.message_cache = Boolean.parseBoolean(c);
+ }
+
+ {
+ usage("--persistent", "set the delivery-mode to persistent if true", persistent);
+ }
+
+ public void parse__persistent(String p)
+ {
+ this.persistent = Boolean.parseBoolean(p);
+ }
+
+ {
+ usage("--jms-publish", "use the jms client for publish", jms_publish);
+ }
+
+ public void parse__jms_publish(String jp)
+ {
+ this.jms_publish = Boolean.parseBoolean(jp);
+ }
+
+ {
+ usage("--jms-consume", "use the jms client for consume", jms_consume);
+ }
+
+ public void parse__jms_consume(String jc)
+ {
+ this.jms_consume = Boolean.parseBoolean(jc);
+ }
+
+ {
+ usage("--jms", "sets both --jms-publish and --jms-consume", false);
+ }
+
+ public void parse__jms(String j)
+ {
+ this.jms_publish = this.jms_consume = Boolean.parseBoolean(j);
+ }
+
+ {
+ usage("-h, --help", "prints this message", null);
+ }
+
+ public void parse__help()
+ {
+ this.help = true;
+ }
+
+ public void parse_h()
+ {
+ parse__help();
+ }
+
+ public String parse(String ... args)
+ {
+ Class klass = getClass();
+ List<String> arguments = new ArrayList<String>();
+ for (int i = 0; i < args.length; i++)
+ {
+ String option = args[i];
+
+ if (!option.startsWith("-"))
+ {
+ arguments.add(option);
+ continue;
+ }
+
+ String method = "parse" + option.replace('-', '_');
+ try
+ {
+ try
+ {
+ Method parser = klass.getMethod(method);
+ parser.invoke(this);
+ }
+ catch (NoSuchMethodException e)
+ {
+ try
+ {
+ Method parser = klass.getMethod(method, String.class);
+
+ String value = null;
+ if (i + 1 < args.length)
+ {
+ value = args[i+1];
+ i++;
+ }
+ else
+ {
+ return option + " requires a value";
+ }
+
+ parser.invoke(this, value);
+ }
+ catch (NoSuchMethodException e2)
+ {
+ return "no such option: " + option;
+ }
+ }
+ }
+ catch (InvocationTargetException e)
+ {
+ Throwable t = e.getCause();
+ return String.format
+ ("error parsing %s: %s: %s", option, t.getClass().getName(),
+ t.getMessage());
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new RuntimeException
+ ("unable to access parse method: " + option, e);
+ }
+ }
+
+ return parseArguments(arguments);
+ }
+
+ public String parseArguments(List<String> arguments)
+ {
+ if (arguments.size() > 0)
+ {
+ String args = arguments.toString();
+ return "unrecognized arguments: " + args.substring(1, args.length() - 1);
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public String toString()
+ {
+ Class klass = getClass();
+ Field[] fields = klass.getFields();
+ StringBuilder str = new StringBuilder();
+ for (int i = 0; i < fields.length; i++)
+ {
+ if (i > 0)
+ {
+ str.append("\n");
+ }
+
+ String name = fields[i].getName();
+ str.append(name);
+ str.append(" = ");
+ Object value;
+ try
+ {
+ value = fields[i].get(this);
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new RuntimeException
+ ("unable to access field: " + name, e);
+ }
+ str.append(value);
+ }
+
+ return str.toString();
+ }
+ }
+
+ public static final void main(String[] args) throws Exception
+ {
+ final Options opts = new Options();
+ String error = opts.parse(args);
+ if (error != null)
+ {
+ System.err.println(error);
+ System.exit(-1);
+ return;
+ }
+
+ if (opts.help)
+ {
+ System.out.println(opts.usage);
+ return;
+ }
+
+ System.out.println(opts);
+
+ switch (opts.mode)
+ {
+ case CONSUME:
+ case BOTH:
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ if (opts.jms_consume)
+ {
+ jms_consumer(opts);
+ }
+ else
+ {
+ native_consumer(opts);
+ }
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ System.out.println("Consumer Completed");
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating consumer thread",e);
+ }
+ t.start();
+ break;
+ }
+
+ switch (opts.mode)
+ {
+ case PUBLISH:
+ case BOTH:
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ if (opts.jms_publish)
+ {
+ jms_publisher(opts);
+ }
+ else
+ {
+ native_publisher(opts);
+ }
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ System.out.println("Producer Completed");
+ }
+ };
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating publisher thread",e);
+ }
+ t.start();
+ break;
+ }
+ }
+
+ private static enum Column
+ {
+ LEFT, RIGHT
+ }
+
+ private static final void sample(Options opts, Column col, String name, long count,
+ long start, long time, long lastTime)
+ {
+ String pfx = "";
+ String sfx = "";
+ if (opts.mode == BOTH)
+ {
+ if (col == Column.RIGHT)
+ {
+ pfx = " -- ";
+ }
+ else
+ {
+ sfx = " --";
+ }
+ }
+
+ if (count == 0)
+ {
+ String stats = String.format("%s: %tc", name, start);
+ System.out.println(String.format("%s%-36s%s", pfx, stats, sfx));
+ return;
+ }
+
+ double cumulative = 1000 * (double) count / (double) (time - start);
+ double interval = 1000 * ((double) opts.sample / (double) (time - lastTime));
+
+ String stats = String.format
+ ("%s: %d %.2f %.2f", name, count, cumulative, interval);
+ System.out.println(String.format("%s%-36s%s", pfx, stats, sfx));
+ }
+
+ private static final javax.jms.Connection getJMSConnection(Options opts) throws Exception
+ {
+ String url = String.format
+ ("amqp://guest:guest@clientid/test?brokerlist='tcp://%s:%d'",
+ opts.broker, opts.port);
+ return new AMQConnection(url);
+ }
+
+ private static final void jms_publisher(Options opts) throws Exception
+ {
+ javax.jms.Connection conn = getJMSConnection(opts);
+
+ javax.jms.Session ssn = conn.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+ Destination dest = ssn.createQueue("test-queue");
+ Destination echo_dest = ssn.createQueue("echo-queue");
+ MessageProducer prod = ssn.createProducer(dest);
+ MessageConsumer cons = ssn.createConsumer(echo_dest);
+ prod.setDisableMessageID(!opts.message_id);
+ prod.setDisableMessageTimestamp(!opts.timestamp);
+ prod.setDeliveryMode(opts.persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ StringBuilder str = new StringBuilder();
+ for (int i = 0; i < opts.size; i++)
+ {
+ str.append((char) (i % 128));
+ }
+
+ String body = str.toString();
+
+ TextMessage cached = ssn.createTextMessage();
+ cached.setText(body);
+
+ conn.start();
+
+ long count = 0;
+ long lastTime = 0;
+ long start = System.currentTimeMillis();
+ while (opts.count == 0 || count < opts.count)
+ {
+ if (opts.window > 0 && (count % opts.window) == 0 && count > 0)
+ {
+ Message echo = cons.receive();
+ }
+
+ if (opts.sample > 0 && (count % opts.sample) == 0)
+ {
+ long time = System.currentTimeMillis();
+ sample(opts, Column.LEFT, "JP", count, start, time, lastTime);
+ lastTime = time;
+ }
+
+ TextMessage m;
+ if (opts.message_cache)
+ {
+ m = cached;
+ }
+ else
+ {
+ m = ssn.createTextMessage();
+ m.setText(body);
+ }
+
+ prod.send(m);
+ count++;
+ }
+
+ conn.close();
+ }
+
+ private static final void jms_consumer(final Options opts) throws Exception
+ {
+ final javax.jms.Connection conn = getJMSConnection(opts);
+ javax.jms.Session ssn = conn.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+ Destination dest = ssn.createQueue("test-queue");
+ Destination echo_dest = ssn.createQueue("echo-queue");
+ MessageConsumer cons = ssn.createConsumer(dest);
+ final MessageProducer prod = ssn.createProducer(echo_dest);
+ prod.setDisableMessageID(true);
+ prod.setDisableMessageTimestamp(true);
+ prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ final TextMessage echo = ssn.createTextMessage();
+ echo.setText("ECHO");
+
+ final Object done = new Object();
+ cons.setMessageListener(new MessageListener()
+ {
+ private long count = 0;
+ private long lastTime = 0;
+ private long start;
+
+ public void onMessage(Message m)
+ {
+ if (count == 0)
+ {
+ start = System.currentTimeMillis();
+ }
+
+ try
+ {
+ boolean sample = opts.sample > 0 && (count % opts.sample) == 0;
+ long time = sample ? System.currentTimeMillis() : 0;
+
+ if (opts.window > 0 && (count % opts.window) == 0)
+ {
+ prod.send(echo);
+ }
+
+ if (sample)
+ {
+ sample(opts, Column.RIGHT, "JC", count, start, time, lastTime);
+ lastTime = time;
+ }
+ }
+ catch (JMSException e)
+ {
+ throw new RuntimeException(e);
+ }
+ count++;
+
+ if (opts.count > 0 && count >= opts.count)
+ {
+ synchronized (done)
+ {
+ done.notify();
+ }
+ }
+ }
+ });
+
+ conn.start();
+ synchronized (done)
+ {
+ done.wait();
+ }
+ conn.close();
+ }
+
+ private static final org.apache.qpid.transport.Connection getConnection
+ (Options opts)
+ {
+ org.apache.qpid.transport.Connection conn =
+ new org.apache.qpid.transport.Connection();
+ conn.connect(opts.broker, opts.port, null, "guest", "guest",false);
+ return conn;
+ }
+
+ private static abstract class NativeListener implements SessionListener
+ {
+
+ public void opened(org.apache.qpid.transport.Session ssn) {}
+
+ public void resumed(org.apache.qpid.transport.Session ssn) {}
+
+ public void exception(org.apache.qpid.transport.Session ssn,
+ SessionException exc)
+ {
+ exc.printStackTrace();
+ }
+
+ public void closed(org.apache.qpid.transport.Session ssn) {}
+
+ }
+
+ private static final void native_publisher(Options opts) throws Exception
+ {
+ final long[] echos = { 0 };
+ org.apache.qpid.transport.Connection conn = getConnection(opts);
+ org.apache.qpid.transport.Session ssn = conn.createSession();
+ ssn.setSessionListener(new NativeListener()
+ {
+ public void message(org.apache.qpid.transport.Session ssn,
+ MessageTransfer xfr)
+ {
+ synchronized (echos)
+ {
+ echos[0]++;
+ echos.notify();
+ }
+ ssn.processed(xfr);
+ }
+ });
+
+ ssn.invoke(new QueueDeclare().queue("test-queue").durable(false));
+ ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false));
+ ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("test-queue").bindingKey("test-queue"));
+ ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("echo-queue").bindingKey("echo-queue"));
+
+ MessageProperties cached_mp = new MessageProperties();
+ DeliveryProperties cached_dp = new DeliveryProperties();
+ cached_dp.setRoutingKey("test-queue");
+ cached_dp.setDeliveryMode
+ (opts.persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT);
+
+ int size = opts.size;
+ ByteBuffer body = ByteBuffer.allocate(size);
+ for (int i = 0; i < size; i++)
+ {
+ body.put((byte) i);
+ }
+ body.flip();
+
+ ssn.invoke(new MessageSubscribe()
+ .queue("echo-queue")
+ .destination("echo-queue")
+ .acceptMode(MessageAcceptMode.NONE)
+ .acquireMode(MessageAcquireMode.PRE_ACQUIRED));
+ ssn.messageSetFlowMode("echo-queue", MessageFlowMode.WINDOW);
+ ssn.messageFlow("echo-queue", MessageCreditUnit.MESSAGE, 0xFFFFFFFF);
+ ssn.messageFlow("echo-queue", MessageCreditUnit.BYTE, 0xFFFFFFFF);
+
+ UUIDGen gen = UUIDs.newGenerator();
+
+ long count = 0;
+ long lastTime = 0;
+ long start = System.currentTimeMillis();
+ while (opts.count == 0 || count < opts.count)
+ {
+ if (opts.window > 0 && (count % opts.window) == 0 && count > 0)
+ {
+ synchronized (echos)
+ {
+ while (echos[0] < (count/opts.window))
+ {
+ echos.wait();
+ }
+ }
+ }
+
+ if (opts.sample > 0 && (count % opts.sample) == 0)
+ {
+ long time = System.currentTimeMillis();
+ sample(opts, Column.LEFT, "NP", count, start, time, lastTime);
+ lastTime = time;
+ }
+
+ MessageProperties mp;
+ DeliveryProperties dp;
+ if (opts.message_cache)
+ {
+ mp = cached_mp;
+ dp = cached_dp;
+ }
+ else
+ {
+ mp = new MessageProperties();
+ dp = new DeliveryProperties();
+ dp.setRoutingKey("test-queue");
+ dp.setDeliveryMode
+ (opts.persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT);
+
+ }
+
+ if (opts.message_id)
+ {
+ mp.setMessageId(gen.generate());
+ }
+
+ if (opts.timestamp)
+ {
+ dp.setTimestamp(System.currentTimeMillis());
+ }
+
+ ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
+ new Header(dp, mp), body.slice());
+ count++;
+ }
+
+ ssn.messageCancel("echo-queue");
+
+ ssn.sync();
+ ssn.close();
+ conn.close();
+ }
+
+ private static final void native_consumer(final Options opts) throws Exception
+ {
+ final DeliveryProperties dp = new DeliveryProperties();
+ final byte[] echo = new byte[0];
+ dp.setRoutingKey("echo-queue");
+ dp.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
+ final MessageProperties mp = new MessageProperties();
+ final Object done = new Object();
+ org.apache.qpid.transport.Connection conn = getConnection(opts);
+ org.apache.qpid.transport.Session ssn = conn.createSession();
+ ssn.setSessionListener(new NativeListener()
+ {
+ private long count = 0;
+ private long lastTime = 0;
+ private long start;
+
+ public void message(org.apache.qpid.transport.Session ssn,
+ MessageTransfer xfr)
+ {
+ if (count == 0)
+ {
+ start = System.currentTimeMillis();
+ }
+
+ boolean sample = opts.sample > 0 && (count % opts.sample) == 0;
+ long time = sample ? System.currentTimeMillis() : 0;
+
+ if (opts.window > 0 && (count % opts.window) == 0)
+ {
+ ssn.messageTransfer("amq.direct",
+ MessageAcceptMode.NONE,
+ MessageAcquireMode.PRE_ACQUIRED,
+ new Header(dp, mp),
+ echo);
+ }
+
+ if (sample)
+ {
+ sample(opts, Column.RIGHT, "NC", count, start, time, lastTime);
+ lastTime = time;
+ }
+ ssn.processed(xfr);
+ count++;
+
+ if (opts.count > 0 && count >= opts.count)
+ {
+ synchronized (done)
+ {
+ done.notify();
+ }
+ }
+ }
+ });
+
+ ssn.invoke(new QueueDeclare().queue("test-queue").durable(false));
+ ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false));
+ ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("test-queue").bindingKey("test-queue"));
+ ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("echo-queue").bindingKey("echo-queue"));
+
+ ssn.invoke(new MessageSubscribe()
+ .queue("test-queue")
+ .destination("test-queue")
+ .acceptMode(MessageAcceptMode.NONE)
+ .acquireMode(MessageAcquireMode.PRE_ACQUIRED));
+ ssn.messageSetFlowMode("test-queue", MessageFlowMode.WINDOW);
+ ssn.messageFlow("test-queue", MessageCreditUnit.MESSAGE, 0xFFFFFFFF);
+ ssn.messageFlow("test-queue", MessageCreditUnit.BYTE, 0xFFFFFFFF);
+
+ synchronized (done)
+ {
+ done.wait();
+ }
+
+ ssn.messageCancel("test-queue");
+
+ ssn.sync();
+ ssn.close();
+ conn.close();
+ }
+
+}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
new file mode 100644
index 0000000000..89d6462a39
--- /dev/null
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
@@ -0,0 +1,170 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tools;
+
+import javax.jms.Session;
+
+public class TestParams
+{
+ /*
+ * By default the connection URL is used.
+ * This allows a user to easily specify a fully fledged URL any given property.
+ * Ex. SSL parameters
+ *
+ * By providing a host & port allows a user to simply override the URL.
+ * This allows to create multiple clients in test scripts easily,
+ * without having to deal with the long URL format.
+ */
+ private String url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'";
+
+ private String host = "";
+
+ private int port = -1;
+
+ private String address = "queue; {create : always}";
+
+ private int msg_size = 1024;
+
+ private int msg_type = 1; // not used yet
+
+ private boolean cacheMessage = false;
+
+ private boolean disableMessageID = false;
+
+ private boolean disableTimestamp = false;
+
+ private boolean durable = false;
+
+ private boolean transacted = false;
+
+ private int transaction_size = 1000;
+
+ private int ack_mode = Session.AUTO_ACKNOWLEDGE;
+
+ private int msg_count = 10;
+
+ private int warmup_count = 1;
+
+ private boolean random_msg_size = false;
+
+ public TestParams()
+ {
+
+ url = System.getProperty("url",url);
+ host = System.getProperty("host","");
+ port = Integer.getInteger("port", -1);
+ address = System.getProperty("address","queue");
+
+ msg_size = Integer.getInteger("msg_size", 1024);
+ msg_type = Integer.getInteger("msg_type",1);
+ cacheMessage = Boolean.getBoolean("cache_msg");
+ disableMessageID = Boolean.getBoolean("disableMessageID");
+ disableTimestamp = Boolean.getBoolean("disableTimestamp");
+ durable = Boolean.getBoolean("durable");
+ transacted = Boolean.getBoolean("transacted");
+ transaction_size = Integer.getInteger("trans_size",1000);
+ ack_mode = Integer.getInteger("ack_mode",Session.AUTO_ACKNOWLEDGE);
+ msg_count = Integer.getInteger("msg_count",msg_count);
+ warmup_count = Integer.getInteger("warmup_count",warmup_count);
+ random_msg_size = Boolean.getBoolean("random_msg_size");
+ }
+
+ public String getUrl()
+ {
+ return url;
+ }
+
+ public String getHost()
+ {
+ return host;
+ }
+
+ public int getPort()
+ {
+ return port;
+ }
+
+ public String getAddress()
+ {
+ return address;
+ }
+
+ public int getAckMode()
+ {
+ return ack_mode;
+ }
+
+ public int getMsgCount()
+ {
+ return msg_count;
+ }
+
+ public int getMsgSize()
+ {
+ return msg_size;
+ }
+
+ public int getMsgType()
+ {
+ return msg_type;
+ }
+
+ public boolean isDurable()
+ {
+ return durable;
+ }
+
+ public boolean isTransacted()
+ {
+ return transacted;
+ }
+
+ public int getTransactionSize()
+ {
+ return transaction_size;
+ }
+
+ public int getWarmupCount()
+ {
+ return warmup_count;
+ }
+
+ public boolean isCacheMessage()
+ {
+ return cacheMessage;
+ }
+
+ public boolean isDisableMessageID()
+ {
+ return disableMessageID;
+ }
+
+ public boolean isDisableTimestamp()
+ {
+ return disableTimestamp;
+ }
+
+ public boolean isRandomMsgSize()
+ {
+ return random_msg_size;
+ }
+
+}