summaryrefslogtreecommitdiff
path: root/qpid/java/tools/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/tools/src/main')
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/testkit/Client.java154
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java27
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java216
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java197
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java370
5 files changed, 964 insertions, 0 deletions
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Client.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Client.java
new file mode 100644
index 0000000000..b10129d855
--- /dev/null
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Client.java
@@ -0,0 +1,154 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit;
+
+
+import java.text.DateFormat;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.text.SimpleDateFormat;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+public abstract class Client implements ExceptionListener
+{
+ private Connection con;
+ private Session ssn;
+ private boolean durable = false;
+ private boolean transacted = false;
+ private int txSize = 10;
+ private int ack_mode = Session.AUTO_ACKNOWLEDGE;
+ private String contentType = "application/octet-stream";
+
+ private long reportFrequency = 60000; // every min
+
+ private DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
+ private NumberFormat nf = new DecimalFormat("##.00");
+
+ private long startTime = System.currentTimeMillis();
+ private ErrorHandler errorHandler = null;
+
+ public Client(Connection con) throws Exception
+ {
+ this.con = con;
+ this.con.setExceptionListener(this);
+ durable = Boolean.getBoolean("durable");
+ transacted = Boolean.getBoolean("transacted");
+ txSize = Integer.getInteger("tx_size",10);
+ contentType = System.getProperty("content_type","application/octet-stream");
+ reportFrequency = Long.getLong("report_frequency", 60000);
+ }
+
+ public void close()
+ {
+ try
+ {
+ con.close();
+ }
+ catch (Exception e)
+ {
+ handleError("Error closing connection",e);
+ }
+ }
+
+ public void onException(JMSException e)
+ {
+ handleError("Connection error",e);
+ }
+
+ public void setErrorHandler(ErrorHandler h)
+ {
+ this.errorHandler = h;
+ }
+
+ public void handleError(String msg,Exception e)
+ {
+ if (errorHandler != null)
+ {
+ errorHandler.handleError(msg, e);
+ }
+ else
+ {
+ System.err.println(msg);
+ e.printStackTrace();
+ }
+ }
+
+ protected Session getSsn()
+ {
+ return ssn;
+ }
+
+ protected void setSsn(Session ssn)
+ {
+ this.ssn = ssn;
+ }
+
+ protected boolean isDurable()
+ {
+ return durable;
+ }
+
+ protected boolean isTransacted()
+ {
+ return transacted;
+ }
+
+ protected int getTxSize()
+ {
+ return txSize;
+ }
+
+ protected int getAck_mode()
+ {
+ return ack_mode;
+ }
+
+ protected String getContentType()
+ {
+ return contentType;
+ }
+
+ protected long getReportFrequency()
+ {
+ return reportFrequency;
+ }
+
+ protected long getStartTime()
+ {
+ return startTime;
+ }
+
+ protected void setStartTime(long startTime)
+ {
+ this.startTime = startTime;
+ }
+
+ public DateFormat getDf()
+ {
+ return df;
+ }
+
+}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java
new file mode 100644
index 0000000000..dbc73c404f
--- /dev/null
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java
@@ -0,0 +1,27 @@
+package org.apache.qpid.testkit;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+public interface ErrorHandler {
+
+ public void handleError(String msg,Exception e);
+}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java
new file mode 100644
index 0000000000..b4294ee4cc
--- /dev/null
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java
@@ -0,0 +1,216 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.client.AMQAnyDestination;
+import org.apache.qpid.client.AMQConnection;
+
+/**
+ * A generic receiver which consumes messages
+ * from a given address in a broker (host/port)
+ * until told to stop by killing it.
+ *
+ * It participates in a feedback loop to ensure the producer
+ * doesn't fill up the queue. If it receives an "End" msg
+ * it sends a reply to the replyTo address in that msg.
+ *
+ * It doesn't check for correctness or measure anything
+ * leaving those concerns to another entity.
+ * However it prints a timestamp every x secs(-Dreport_frequency)
+ * as checkpoint to figure out how far the test has progressed if
+ * a failure occurred.
+ *
+ * It also takes in an optional Error handler to
+ * pass out any error in addition to writing them to std err.
+ *
+ * This is intended more as building block to create
+ * more complex test cases. However there is a main method
+ * provided to use this standalone.
+ *
+ * The following options are available and configurable
+ * via jvm args.
+ *
+ * sync_rcv - Whether to consume sync (instead of using a listener).
+ * report_frequency - how often a timestamp is printed
+ * durable
+ * transacted
+ * tx_size - size of transaction batch in # msgs. *
+ * check_for_dups - check for duplicate messages and out of order messages.
+ * jms_durable_sub - create a durable subscription instead of a regular subscription.
+ */
+public class Receiver extends Client implements MessageListener
+{
+ long msg_count = 0;
+ int sequence = 0;
+ boolean syncRcv = Boolean.getBoolean("sync_rcv");
+ boolean jmsDurableSub = Boolean.getBoolean("jms_durable_sub");
+ boolean checkForDups = Boolean.getBoolean("check_for_dups");
+ MessageConsumer consumer;
+ List<Integer> duplicateMessages = new ArrayList<Integer>();
+
+ public Receiver(Connection con,String addr) throws Exception
+ {
+ super(con);
+ setSsn(con.createSession(isTransacted(), getAck_mode()));
+ consumer = getSsn().createConsumer(new AMQAnyDestination(addr));
+ if (!syncRcv)
+ {
+ consumer.setMessageListener(this);
+ }
+
+ System.out.println("Receiving messages from : " + addr);
+ }
+
+ public void onMessage(Message msg)
+ {
+ handleMessage(msg);
+ }
+
+ public void run() throws Exception
+ {
+ long sleepTime = getReportFrequency();
+ while(true)
+ {
+ if(syncRcv)
+ {
+ long t = sleepTime;
+ while (t > 0)
+ {
+ long start = System.currentTimeMillis();
+ Message msg = consumer.receive(t);
+ t = t - (System.currentTimeMillis() - start);
+ handleMessage(msg);
+ }
+ }
+ Thread.sleep(sleepTime);
+ System.out.println(getDf().format(System.currentTimeMillis())
+ + " - messages received : " + msg_count);
+ }
+ }
+
+ private void handleMessage(Message m)
+ {
+ if (m == null) { return; }
+
+ try
+ {
+ if (m instanceof TextMessage && ((TextMessage) m).getText().equals("End"))
+ {
+ MessageProducer temp = getSsn().createProducer(m.getJMSReplyTo());
+ Message controlMsg = getSsn().createTextMessage();
+ temp.send(controlMsg);
+ if (isTransacted())
+ {
+ getSsn().commit();
+ }
+ temp.close();
+ }
+ else
+ {
+
+ int seq = m.getIntProperty("sequence");
+ if (checkForDups)
+ {
+ if (seq == 0)
+ {
+ sequence = 0; // wrap around for each iteration
+ System.out.println("Received " + duplicateMessages.size() + " duplicate messages during the iteration");
+ duplicateMessages.clear();
+ }
+
+ if (seq < sequence)
+ {
+ duplicateMessages.add(seq);
+ }
+ else if (seq == sequence)
+ {
+ sequence++;
+ msg_count ++;
+ }
+ else
+ {
+ // Multiple publishers are not allowed in this test case.
+ // So out of order messages are not allowed.
+ throw new Exception(": Received an out of order message (expected="
+ + sequence + ",received=" + seq + ")" );
+ }
+ }
+ else
+ {
+ msg_count ++;
+ }
+
+ // Please note that this test case doesn't expect duplicates
+ // When testing for transactions.
+ if (isTransacted() && msg_count % getTxSize() == 0)
+ {
+ getSsn().commit();
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ handleError("Exception receiving messages",e);
+ }
+ }
+
+ // Receiver host port address
+ public static void main(String[] args) throws Exception
+ {
+ String host = "127.0.0.1";
+ int port = 5672;
+ String addr = "message_queue";
+
+ if (args.length > 0)
+ {
+ host = args[0];
+ }
+ if (args.length > 1)
+ {
+ port = Integer.parseInt(args[1]);
+ }
+ if (args.length > 2)
+ {
+ addr = args[2];
+ }
+
+ AMQConnection con = new AMQConnection(
+ "amqp://username:password@topicClientid/test?brokerlist='tcp://"
+ + host + ":" + port + "'");
+
+ Receiver rcv = new Receiver(con,addr);
+ rcv.run();
+ }
+
+}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java
new file mode 100644
index 0000000000..14b9b7302f
--- /dev/null
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java
@@ -0,0 +1,197 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit;
+
+
+import java.text.DateFormat;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.text.SimpleDateFormat;
+import java.util.Random;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.client.AMQAnyDestination;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.tools.MessageFactory;
+
+/**
+ * A generic sender which sends a stream of messages
+ * to a given address in a broker (host/port)
+ * until told to stop by killing it.
+ *
+ * It has a feedback loop to ensure it doesn't fill
+ * up queues due to a slow consumer.
+ *
+ * It doesn't check for correctness or measure anything
+ * leaving those concerns to another entity.
+ * However it prints a timestamp every x secs(-Dreport_frequency)
+ * as checkpoint to figure out how far the test has progressed if
+ * a failure occurred.
+ *
+ * It also takes in an optional Error handler to
+ * pass out any error in addition to writing them to std err.
+ *
+ * This is intended more as building block to create
+ * more complex test cases. However there is a main method
+ * provided to use this standalone.
+ *
+ * The following options are available and configurable
+ * via jvm args.
+ *
+ * msg_size (256)
+ * msg_count (10) - # messages before waiting for feedback
+ * sleep_time (1000 ms) - sleep time btw each iteration
+ * report_frequency - how often a timestamp is printed
+ * durable
+ * transacted
+ * tx_size - size of transaction batch in # msgs.
+ */
+public class Sender extends Client
+{
+ protected int msg_size = 256;
+ protected int msg_count = 10;
+ protected int iterations = -1;
+ protected long sleep_time = 1000;
+
+ protected Destination dest = null;
+ protected Destination replyTo = null;
+ protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
+ protected NumberFormat nf = new DecimalFormat("##.00");
+
+ protected MessageProducer producer;
+ Random gen = new Random(19770905);
+
+ public Sender(Connection con,String addr) throws Exception
+ {
+ super(con);
+ this.msg_size = Integer.getInteger("msg_size", 100);
+ this.msg_count = Integer.getInteger("msg_count", 10);
+ this.iterations = Integer.getInteger("iterations", -1);
+ this.sleep_time = Long.getLong("sleep_time", 1000);
+ this.setSsn(con.createSession(isTransacted(),Session.AUTO_ACKNOWLEDGE));
+ this.dest = new AMQAnyDestination(addr);
+ this.producer = getSsn().createProducer(dest);
+ this.replyTo = getSsn().createTemporaryQueue();
+
+ System.out.println("Sending messages to : " + addr);
+ }
+
+ /*
+ * If msg_size not specified it generates a message
+ * between 500-1500 bytes.
+ */
+ protected Message getNextMessage() throws Exception
+ {
+ int s = msg_size == -1 ? 500 + gen.nextInt(1000) : msg_size;
+ Message msg = (getContentType().equals("text/plain")) ?
+ MessageFactory.createTextMessage(getSsn(), s):
+ MessageFactory.createBytesMessage(getSsn(), s);
+
+ msg.setJMSDeliveryMode((isDurable()) ? DeliveryMode.PERSISTENT
+ : DeliveryMode.NON_PERSISTENT);
+ return msg;
+ }
+
+ public void run()
+ {
+ try
+ {
+ boolean infinite = (iterations == -1);
+ for (int x=0; infinite || x < iterations; x++)
+ {
+ long now = System.currentTimeMillis();
+ if (now - getStartTime() >= getReportFrequency())
+ {
+ System.out.println(df.format(now) + " - iterations : " + x);
+ setStartTime(now);
+ }
+
+ for (int i = 0; i < msg_count; i++)
+ {
+ Message msg = getNextMessage();
+ msg.setIntProperty("sequence",i);
+ producer.send(msg);
+ if (isTransacted() && msg_count % getTxSize() == 0)
+ {
+ getSsn().commit();
+ }
+ }
+ TextMessage m = getSsn().createTextMessage("End");
+ m.setJMSReplyTo(replyTo);
+ producer.send(m);
+
+ if (isTransacted())
+ {
+ getSsn().commit();
+ }
+
+ MessageConsumer feedbackConsumer = getSsn().createConsumer(replyTo);
+ feedbackConsumer.receive();
+ feedbackConsumer.close();
+ if (isTransacted())
+ {
+ getSsn().commit();
+ }
+ Thread.sleep(sleep_time);
+ }
+ }
+ catch (Exception e)
+ {
+ handleError("Exception sending messages",e);
+ }
+ }
+
+ // Receiver host port address
+ public static void main(String[] args) throws Exception
+ {
+ String host = "127.0.0.1";
+ int port = 5672;
+ String addr = "message_queue";
+
+ if (args.length > 0)
+ {
+ host = args[0];
+ }
+ if (args.length > 1)
+ {
+ port = Integer.parseInt(args[1]);
+ }
+ if (args.length > 2)
+ {
+ addr = args[2];
+ }
+
+ AMQConnection con = new AMQConnection(
+ "amqp://username:password@topicClientid/test?brokerlist='tcp://"
+ + host + ":" + port + "'");
+
+ Sender sender = new Sender(con,addr);
+ sender.run();
+ }
+}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java
new file mode 100644
index 0000000000..36ae7cad42
--- /dev/null
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java
@@ -0,0 +1,370 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit;
+
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.text.DateFormat;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.apache.qpid.client.AMQAnyDestination;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.thread.Threading;
+
+/**
+ * A basic test case class that could launch a Sender/Receiver
+ * or both, each on it's own separate thread.
+ *
+ * If con_count == ssn_count, then each entity created will have
+ * it's own Connection. Else if con_count < ssn_count, then
+ * a connection will be shared by ssn_count/con_count # of entities.
+ *
+ * The if both sender and receiver options are set, it will
+ * share a connection.
+ *
+ * The following options are available as jvm args
+ * host, port
+ * con_count,ssn_count
+ * con_idle_time - which determines heartbeat
+ * sender, receiver - booleans which indicate which entity to create.
+ * Setting them both is also a valid option.
+ */
+public class TestLauncher implements ErrorHandler
+{
+ protected String host = "127.0.0.1";
+ protected int port = 5672;
+ protected int sessions_per_con = 1;
+ protected int connection_count = 1;
+ protected long heartbeat = 5000;
+ protected boolean sender = false;
+ protected boolean receiver = false;
+ protected boolean useUniqueDests = false;
+ protected String url;
+
+ protected String address = "my_queue; {create: always}";
+ protected boolean durable = false;
+ protected String failover = "";
+ protected AMQConnection controlCon;
+ protected Destination controlDest = null;
+ protected Session controlSession = null;
+ protected MessageProducer statusSender;
+ protected List<AMQConnection> clients = new ArrayList<AMQConnection>();
+ protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
+ protected NumberFormat nf = new DecimalFormat("##.00");
+ protected String testName;
+
+ public TestLauncher()
+ {
+ testName = System.getProperty("test_name","UNKNOWN");
+ host = System.getProperty("host", "127.0.0.1");
+ port = Integer.getInteger("port", 5672);
+ sessions_per_con = Integer.getInteger("ssn_per_con", 1);
+ connection_count = Integer.getInteger("con_count", 1);
+ heartbeat = Long.getLong("heartbeat", 5);
+ sender = Boolean.getBoolean("sender");
+ receiver = Boolean.getBoolean("receiver");
+ useUniqueDests = Boolean.getBoolean("use_unique_dests");
+
+ failover = System.getProperty("failover", "");
+ durable = Boolean.getBoolean("durable");
+
+ url = "amqp://username:password@topicClientid/test?brokerlist='tcp://"
+ + host + ":" + port + "?heartbeat='" + heartbeat+ "''";
+
+ if (failover.equalsIgnoreCase("failover_exchange"))
+ {
+ url += "&failover='failover_exchange'";
+
+ System.out.println("Failover exchange " + url );
+ }
+
+ configureLogging();
+ }
+
+ protected void configureLogging()
+ {
+ PatternLayout layout = new PatternLayout();
+ layout.setConversionPattern("%t %d %p [%c{4}] %m%n");
+ BasicConfigurator.configure(new ConsoleAppender(layout));
+
+ String logLevel = System.getProperty("log.level","warn");
+ String logComponent = System.getProperty("log.comp","org.apache.qpid");
+
+ Logger logger = Logger.getLogger(logComponent);
+ logger.setLevel(Level.toLevel(logLevel, Level.WARN));
+
+ System.out.println("Level " + logger.getLevel());
+
+ }
+
+ public void setUpControlChannel()
+ {
+ try
+ {
+ controlCon = new AMQConnection(url);
+ controlCon.start();
+
+ controlDest = new AMQAnyDestination("control; {create: always}"); // durable
+
+ // Create the session to setup the messages
+ controlSession = controlCon.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ statusSender = controlSession.createProducer(controlDest);
+
+ }
+ catch (Exception e)
+ {
+ handleError("Error while setting up the test",e);
+ }
+ }
+
+ public void cleanup()
+ {
+ try
+ {
+ controlSession.close();
+ controlCon.close();
+ for (AMQConnection con : clients)
+ {
+ con.close();
+ }
+ }
+ catch (Exception e)
+ {
+ handleError("Error while tearing down the test",e);
+ }
+ }
+
+ public void start(String addr)
+ {
+ try
+ {
+ if (addr == null)
+ {
+ addr = address;
+ }
+
+ int ssn_per_con = sessions_per_con;
+ String addrTemp = addr;
+ for (int i = 0; i< connection_count; i++)
+ {
+ AMQConnection con = new AMQConnection(url);
+ con.start();
+ clients.add(con);
+ for (int j = 0; j< ssn_per_con; j++)
+ {
+ String index = createPrefix(i,j);
+ if (useUniqueDests)
+ {
+ addrTemp = modifySubject(index,addr);
+ }
+
+ if (sender)
+ {
+ createSender(index,con,addrTemp,this);
+ }
+
+ if (receiver)
+ {
+ System.out.println("########## Creating receiver ##################");
+
+ createReceiver(index,con,addrTemp,this);
+ }
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ handleError("Exception while setting up the test",e);
+ }
+
+ }
+
+ protected void createReceiver(String index,final AMQConnection con, final String addr, final ErrorHandler h)
+ {
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ Receiver rcv = new Receiver(con,addr);
+ rcv.setErrorHandler(h);
+ rcv.run();
+ }
+ catch (Exception e)
+ {
+ h.handleError("Error Starting Receiver", e);
+ }
+ }
+ };
+
+ Thread t = null;
+ try
+ {
+ t = Threading.getThreadFactory().newThread(r);
+ }
+ catch(Exception e)
+ {
+ handleError("Error creating Receive thread",e);
+ }
+
+ t.setName("ReceiverThread-" + index);
+ t.start();
+ }
+
+ protected void createSender(String index,final AMQConnection con, final String addr, final ErrorHandler h)
+ {
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ Sender sender = new Sender(con, addr);
+ sender.setErrorHandler(h);
+ sender.run();
+ }
+ catch (Exception e)
+ {
+ h.handleError("Error Starting Sender", e);
+ }
+ }
+ };
+
+ Thread t = null;
+ try
+ {
+ t = Threading.getThreadFactory().newThread(r);
+ }
+ catch(Exception e)
+ {
+ handleError("Error creating Sender thread",e);
+ }
+
+ t.setName("SenderThread-" + index);
+ t.start();
+ }
+
+ public synchronized void handleError(String msg,Exception e)
+ {
+ // In case sending the message fails
+ StringBuilder sb = new StringBuilder();
+ sb.append(msg);
+ sb.append(" @ ");
+ sb.append(df.format(new Date(System.currentTimeMillis())));
+ sb.append(" ");
+ sb.append(e.getMessage());
+ System.err.println(sb.toString());
+ e.printStackTrace();
+
+ try
+ {
+ TextMessage errorMsg = controlSession.createTextMessage();
+ errorMsg.setStringProperty("status", "error");
+ errorMsg.setStringProperty("desc", msg);
+ errorMsg.setStringProperty("time", df.format(new Date(System.currentTimeMillis())));
+ errorMsg.setStringProperty("exception-trace", serializeStackTrace(e));
+
+ System.out.println("Msg " + errorMsg);
+
+ statusSender.send(errorMsg);
+ }
+ catch (JMSException e1)
+ {
+ e1.printStackTrace();
+ }
+ }
+
+ private String serializeStackTrace(Exception e)
+ {
+ ByteArrayOutputStream bOut = new ByteArrayOutputStream();
+ PrintStream printStream = new PrintStream(bOut);
+ e.printStackTrace(printStream);
+ printStream.close();
+ return bOut.toString();
+ }
+
+ private String createPrefix(int i, int j)
+ {
+ return String.valueOf(i).concat(String.valueOf(j));
+ }
+
+ /**
+ * A basic helper function to modify the subjects by
+ * appending an index.
+ */
+ private String modifySubject(String index,String addr)
+ {
+ if (addr.indexOf("/") > 0)
+ {
+ addr = addr.substring(0,addr.indexOf("/")+1) +
+ index +
+ addr.substring(addr.indexOf("/")+1,addr.length());
+ }
+ else if (addr.indexOf(";") > 0)
+ {
+ addr = addr.substring(0,addr.indexOf(";")) +
+ "/" + index +
+ addr.substring(addr.indexOf(";"),addr.length());
+ }
+ else
+ {
+ addr = addr + "/" + index;
+ }
+
+ return addr;
+ }
+
+ public static void main(String[] args)
+ {
+ final TestLauncher test = new TestLauncher();
+ test.setUpControlChannel();
+ System.out.println("args.length " + args.length);
+ System.out.println("args [0] " + args [0]);
+ test.start(args.length > 0 ? args [0] : null);
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ public void run() { test.cleanup(); }
+ });
+
+ }
+}