summaryrefslogtreecommitdiff
path: root/java/tools/src/main/java/org/apache/qpid/tools/StressTestClient.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/tools/src/main/java/org/apache/qpid/tools/StressTestClient.java')
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/StressTestClient.java446
1 files changed, 446 insertions, 0 deletions
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/StressTestClient.java b/java/tools/src/main/java/org/apache/qpid/tools/StressTestClient.java
new file mode 100644
index 0000000000..5bf1864b4f
--- /dev/null
+++ b/java/tools/src/main/java/org/apache/qpid/tools/StressTestClient.java
@@ -0,0 +1,446 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tools;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+
+public class StressTestClient
+{
+ private static final String QUEUE_NAME_PREFIX = "BURL:direct://amq.direct//stress-test-queue";
+ private static final String DURABLE_SUFFIX = "?durable='true'";
+
+ public static final String CONNECTIONS_ARG = "connections";
+ public static final String SESSIONS_ARG = "sessions";
+ public static final String CONSUME_IMMEDIATELY_ARG = "consumeImmediately";
+ public static final String CONSUMERS_ARG = "consumers";
+ public static final String CLOSE_CONSUMERS_ARG = "closeconsumers";
+ public static final String PRODUCERS_ARG = "producers";
+ public static final String MESSAGE_COUNT_ARG = "messagecount";
+ public static final String MESSAGE_SIZE_ARG = "size";
+ public static final String SUFFIX_ARG = "suffix";
+ public static final String REPETITIONS_ARG = "repetitions";
+ public static final String PERSISTENT_ARG = "persistent";
+ public static final String RANDOM_ARG = "random";
+ public static final String TIMEOUT_ARG = "timeout";
+ public static final String DELAYCLOSE_ARG = "delayclose";
+ public static final String REPORT_MOD_ARG = "reportmod";
+ public static final String LOW_PREFETCH_ARG = "lowprefetch";
+ public static final String TRANSACTED_ARG = "transacted";
+ public static final String TX_BATCH_ARG = "txbatch";
+
+ public static final String CONNECTIONS_DEFAULT = "1";
+ public static final String SESSIONS_DEFAULT = "1";
+ public static final String CONSUME_IMMEDIATELY_DEFAULT = "true";
+ public static final String CLOSE_CONSUMERS_DEFAULT = "true";
+ public static final String PRODUCERS_DEFAULT = "1";
+ public static final String CONSUMERS_DEFAULT = "1";
+ public static final String MESSAGE_COUNT_DEFAULT = "1";
+ public static final String MESSAGE_SIZE_DEFAULT = "256";
+ public static final String SUFFIX_DEFAULT = "";
+ public static final String REPETITIONS_DEFAULT = "1";
+ public static final String PERSISTENT_DEFAULT = "false";
+ public static final String RANDOM_DEFAULT = "true";
+ public static final String TIMEOUT_DEFAULT = "30000";
+ public static final String DELAYCLOSE_DEFAULT = "0";
+ public static final String REPORT_MOD_DEFAULT = "1";
+ public static final String LOW_PREFETCH_DEFAULT = "false";
+ public static final String TRANSACTED_DEFAULT = "false";
+ public static final String TX_BATCH_DEFAULT = "1";
+
+ private static final String CLASS = "StressTestClient";
+
+ public static void main(String[] args)
+ {
+ Map<String,String> options = new HashMap<>();
+ options.put(CONNECTIONS_ARG, CONNECTIONS_DEFAULT);
+ options.put(SESSIONS_ARG, SESSIONS_DEFAULT);
+ options.put(CONSUME_IMMEDIATELY_ARG, CONSUME_IMMEDIATELY_DEFAULT);
+ options.put(PRODUCERS_ARG, PRODUCERS_DEFAULT);
+ options.put(CONSUMERS_ARG, CONSUMERS_DEFAULT);
+ options.put(CLOSE_CONSUMERS_ARG, CLOSE_CONSUMERS_DEFAULT);
+ options.put(MESSAGE_COUNT_ARG, MESSAGE_COUNT_DEFAULT);
+ options.put(MESSAGE_SIZE_ARG, MESSAGE_SIZE_DEFAULT);
+ options.put(SUFFIX_ARG, SUFFIX_DEFAULT);
+ options.put(REPETITIONS_ARG, REPETITIONS_DEFAULT);
+ options.put(PERSISTENT_ARG, PERSISTENT_DEFAULT);
+ options.put(RANDOM_ARG, RANDOM_DEFAULT);
+ options.put(TIMEOUT_ARG, TIMEOUT_DEFAULT);
+ options.put(DELAYCLOSE_ARG, DELAYCLOSE_DEFAULT);
+ options.put(REPORT_MOD_ARG, REPORT_MOD_DEFAULT);
+ options.put(LOW_PREFETCH_ARG, LOW_PREFETCH_DEFAULT);
+ options.put(TRANSACTED_ARG, TRANSACTED_DEFAULT);
+ options.put(TX_BATCH_ARG, TX_BATCH_DEFAULT);
+
+ if(args.length == 1 &&
+ (args[0].equals("-h") || args[0].equals("--help") || args[0].equals("help")))
+ {
+ System.out.println("arg=value options: \n" + options.keySet());
+ return;
+ }
+
+ parseArgumentsIntoConfig(options, args);
+
+ StressTestClient testClient = new StressTestClient();
+ testClient.runTest(options);
+ }
+
+ public static void parseArgumentsIntoConfig(Map<String, String> initialValues, String[] args)
+ {
+ for(String arg: args)
+ {
+ String[] splitArg = arg.split("=");
+ if(splitArg.length != 2)
+ {
+ throw new IllegalArgumentException("arguments must have format <name>=<value>: " + arg);
+ }
+
+ if(initialValues.put(splitArg[0], splitArg[1]) == null)
+ {
+ throw new IllegalArgumentException("not a valid configuration property: " + arg);
+ }
+ }
+ }
+
+
+ private void runTest(Map<String,String> options)
+ {
+ int numConnections = Integer.parseInt(options.get(CONNECTIONS_ARG));
+ int numSessions = Integer.parseInt(options.get(SESSIONS_ARG));
+ int numProducers = Integer.parseInt(options.get(PRODUCERS_ARG));
+ int numConsumers = Integer.parseInt(options.get(CONSUMERS_ARG));
+ boolean closeConsumers = Boolean.valueOf(options.get(CLOSE_CONSUMERS_ARG));
+ boolean consumeImmediately = Boolean.valueOf(options.get(CONSUME_IMMEDIATELY_ARG));
+ int numMessage = Integer.parseInt(options.get(MESSAGE_COUNT_ARG));
+ int messageSize = Integer.parseInt(options.get(MESSAGE_SIZE_ARG));
+ int repetitions = Integer.parseInt(options.get(REPETITIONS_ARG));
+ String queueString = QUEUE_NAME_PREFIX + options.get(SUFFIX_ARG) + DURABLE_SUFFIX;
+ int deliveryMode = Boolean.valueOf(options.get(PERSISTENT_ARG)) ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
+ boolean random = Boolean.valueOf(options.get(RANDOM_ARG));
+ long recieveTimeout = Long.parseLong(options.get(TIMEOUT_ARG));
+ long delayClose = Long.parseLong(options.get(DELAYCLOSE_ARG));
+ int reportingMod = Integer.parseInt(options.get(REPORT_MOD_ARG));
+ boolean lowPrefetch = Boolean.valueOf(options.get(LOW_PREFETCH_ARG));
+ boolean transacted = Boolean.valueOf(options.get(TRANSACTED_ARG));
+ int txBatch = Integer.parseInt(options.get(TX_BATCH_ARG));
+
+ System.out.println(CLASS + ": Using options: " + options);
+
+ System.out.println(CLASS + ": Creating message payload of " + messageSize + " (bytes)");
+ byte[] sentBytes = generateMessage(random, messageSize);
+
+ try
+ {
+ // Load JNDI properties
+ Properties properties = new Properties();
+ try(InputStream is = this.getClass().getClassLoader().getResourceAsStream("stress-test-client.properties"))
+ {
+ properties.load(is);
+ }
+ Context ctx = new InitialContext(properties);
+
+ ConnectionFactory conFac;
+ if(lowPrefetch)
+ {
+ System.out.println(CLASS + ": Using lowprefetch connection factory");
+ conFac = (ConnectionFactory)ctx.lookup("qpidConnectionfactoryLowPrefetch");
+ }
+ else
+ {
+ conFac = (ConnectionFactory)ctx.lookup("qpidConnectionfactory");
+ }
+
+ //ensure the queue to be used exists and is bound
+ System.out.println(CLASS + ": Creating queue: " + queueString);
+ Connection startupConn = conFac.createConnection();
+ Session startupSess = startupConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination startupDestination = startupSess.createQueue(queueString);
+ MessageConsumer startupConsumer = startupSess.createConsumer(startupDestination);
+ startupConsumer.close();
+ startupSess.close();
+ startupConn.close();
+
+ for(int rep = 1 ; rep <= repetitions; rep++)
+ {
+ ArrayList<Connection> connectionList = new ArrayList<>();
+
+ for (int co= 1; co<= numConnections ; co++)
+ {
+ if( co % reportingMod == 0)
+ {
+ System.out.println(CLASS + ": Creating connection " + co);
+ }
+ Connection conn = conFac.createConnection();
+ conn.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException jmse)
+ {
+ System.err.println(CLASS + ": The sample received an exception through the ExceptionListener");
+ jmse.printStackTrace();
+ System.exit(0);
+ }
+ });
+
+ connectionList.add(conn);
+ conn.start();
+ for (int se= 1; se<= numSessions ; se++)
+ {
+ if( se % reportingMod == 0)
+ {
+ System.out.println(CLASS + ": Creating Session " + se);
+ }
+ try
+ {
+ Session sess;
+ if(transacted)
+ {
+ sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+ }
+ else
+ {
+ sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ BytesMessage message = sess.createBytesMessage();
+
+ message.writeBytes(sentBytes);
+
+ if(!random && numMessage == 1 && numSessions == 1 && numConnections == 1 && repetitions == 1)
+ {
+ //null the array to save memory
+ sentBytes = null;
+ }
+
+ Destination destination = sess.createQueue(queueString);
+
+ MessageConsumer consumer = null;
+ for(int cns = 1 ; cns <= numConsumers ; cns++)
+ {
+ if( cns % reportingMod == 0)
+ {
+ System.out.println(CLASS + ": Creating Consumer " + cns);
+ }
+ consumer = sess.createConsumer(destination);
+ }
+
+ for(int pr = 1 ; pr <= numProducers ; pr++)
+ {
+ if( pr % reportingMod == 0)
+ {
+ System.out.println(CLASS + ": Creating Producer " + pr);
+ }
+ MessageProducer prod = sess.createProducer(destination);
+ for(int me = 1; me <= numMessage ; me++)
+ {
+ if( me % reportingMod == 0)
+ {
+ System.out.println(CLASS + ": Sending Message " + me);
+ }
+ prod.send(message, deliveryMode,
+ Message.DEFAULT_PRIORITY,
+ Message.DEFAULT_TIME_TO_LIVE);
+ if(transacted && me % txBatch == 0)
+ {
+ sess.commit();
+ }
+ }
+ }
+
+ if(numConsumers > 0 && consumeImmediately)
+ {
+ for(int cs = 1 ; cs <= numMessage ; cs++)
+ {
+ if( cs % reportingMod == 0)
+ {
+ System.out.println(CLASS + ": Consuming Message " + cs);
+ }
+ BytesMessage msg = (BytesMessage) consumer.receive(recieveTimeout);
+
+ if(transacted && cs % txBatch == 0)
+ {
+ sess.commit();
+ }
+
+ if(msg == null)
+ {
+ throw new RuntimeException("Expected message not received in allowed time: " + recieveTimeout);
+ }
+
+ validateReceivedMessageContent(sentBytes, msg, random, messageSize);
+ }
+
+ if(closeConsumers)
+ {
+ consumer.close();
+ }
+ }
+
+ }
+ catch (Exception exp)
+ {
+ System.err.println(CLASS + ": Caught an Exception: " + exp);
+ exp.printStackTrace();
+ }
+
+ }
+ }
+
+ if(numConsumers == -1 && !consumeImmediately)
+ {
+ System.out.println(CLASS + ": Consuming left over messages, using recieve timeout:" + recieveTimeout);
+
+ Connection conn = conFac.createConnection();
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = sess.createQueue(queueString);
+ MessageConsumer consumer = sess.createConsumer(destination);
+ conn.start();
+
+ int count = 0;
+ while(true)
+ {
+ BytesMessage msg = (BytesMessage) consumer.receive(recieveTimeout);
+
+ if(msg == null)
+ {
+ System.out.println(CLASS + ": Received " + count + " messages");
+ break;
+ }
+ else
+ {
+ count++;
+ }
+
+ validateReceivedMessageContent(sentBytes, msg, random, messageSize);
+ }
+
+ consumer.close();
+ sess.close();
+ conn.close();
+ }
+
+ if(delayClose > 0)
+ {
+ System.out.println(CLASS + ": Delaying closing connections: " + delayClose);
+ Thread.sleep(delayClose);
+ }
+
+ // Close the connections to the server
+ System.out.println(CLASS + ": Closing connections");
+
+ for(int connection = 0 ; connection < connectionList.size() ; connection++)
+ {
+ if( (connection+1) % reportingMod == 0)
+ {
+ System.out.println(CLASS + ": Closing connection " + (connection+1));
+ }
+ Connection c = connectionList.get(connection);
+ c.close();
+ }
+
+ // Close the JNDI reference
+ System.out.println(CLASS + ": Closing JNDI context");
+ ctx.close();
+ }
+ }
+ catch (Exception exp)
+ {
+ System.err.println(CLASS + ": Caught an Exception: " + exp);
+ exp.printStackTrace();
+ }
+ }
+
+
+ private byte[] generateMessage(boolean random, int messageSize)
+ {
+ byte[] sentBytes = new byte[messageSize];
+ if(random)
+ {
+ //fill the array with numbers from 0-9
+ Random rand = new Random(System.currentTimeMillis());
+ for(int r = 0 ; r < messageSize ; r++)
+ {
+ sentBytes[r] = (byte) (48 + rand.nextInt(10));
+ }
+ }
+ else
+ {
+ //use sequential numbers from 0-9
+ for(int r = 0 ; r < messageSize ; r++)
+ {
+ sentBytes[r] = (byte) (48 + (r % 10));
+ }
+ }
+ return sentBytes;
+ }
+
+
+ private void validateReceivedMessageContent(byte[] sentBytes,
+ BytesMessage msg, boolean random, int messageSize) throws JMSException
+ {
+ Long length = msg.getBodyLength();
+
+ if(length != messageSize)
+ {
+ throw new RuntimeException("Incorrect number of bytes received");
+ }
+
+ byte[] recievedBytes = new byte[length.intValue()];
+ msg.readBytes(recievedBytes);
+
+ if(random)
+ {
+ if(!Arrays.equals(sentBytes, recievedBytes))
+ {
+ throw new RuntimeException("Incorrect value of bytes received");
+ }
+ }
+ else
+ {
+ for(int r = 0 ; r < messageSize ; r++)
+ {
+ if(! (recievedBytes[r] == (byte) (48 + (r % 10))))
+ {
+ throw new RuntimeException("Incorrect value of bytes received");
+ }
+ }
+ }
+ }
+}
+