summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Donald Kennedy <grkvlt@apache.org>2010-12-10 00:05:49 +0000
committerAndrew Donald Kennedy <grkvlt@apache.org>2010-12-10 00:05:49 +0000
commitb8983f8dd7679688a8bdd75a47e5cf24ab2ce513 (patch)
tree93fbc183693a191aebba133f4049674ca511d00c
parent8b8ddcaa974de4387e3dd84d12a96435ceaf16b3 (diff)
downloadqpid-python-b8983f8dd7679688a8bdd75a47e5cf24ab2ce513.tar.gz
QPID-2970: Performance testing for DLQs
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5.x-dev@1044180 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xqpid/java/perftests/etc/dlq/Framework.sh26
-rw-r--r--qpid/java/perftests/etc/dlq/README.txt42
-rw-r--r--qpid/java/perftests/etc/dlq/config.properties25
-rw-r--r--qpid/java/perftests/etc/dlq/log4j.xml73
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Check.java59
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Client.java137
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Config.java29
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Create.java58
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Receiver.java161
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Sender.java94
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceFramework.java128
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceStatistics.java132
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java237
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/Statistics.java106
14 files changed, 1307 insertions, 0 deletions
diff --git a/qpid/java/perftests/etc/dlq/Framework.sh b/qpid/java/perftests/etc/dlq/Framework.sh
new file mode 100755
index 0000000000..f6db87e557
--- /dev/null
+++ b/qpid/java/perftests/etc/dlq/Framework.sh
@@ -0,0 +1,26 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Setup Java CLASSPATH
+CLASSPATH=${QPID_HOME}/lib/qpid-all-${VERSION}.jar:${QPID_HOME}/lib/qpid-perftests-${VERSION}.jar
+
+# Run Performance Test Framework
+echo "Running DLQ Performance Tests"
+java -cp ${CLASSPATH} org.apache.qpid.perftests.dlq.test.PerformanceFramework $*
diff --git a/qpid/java/perftests/etc/dlq/README.txt b/qpid/java/perftests/etc/dlq/README.txt
new file mode 100644
index 0000000000..8ca7a1fb7b
--- /dev/null
+++ b/qpid/java/perftests/etc/dlq/README.txt
@@ -0,0 +1,42 @@
+h1. Dead Letter Queue Performance Testing
+
+It is important to be able to understand how the addition of Dead Letter Queue
+capabilities (DLQs) affects the Qpid performance. This is because the main body
+of the work to determine whether a message should be re-delivered or enqueued
+on the DLQ takes places on the main message delivery path, and will therefore
+be called during *every* mesage transit of the broker. This has the potential
+to increase latency, and these tests are designed to allow developers to
+understand how various DLQ parameters and options affect throughput and
+latency times.
+
+The results are generated as a series of CSV text files, based on running the
+same DLQ performance test with every possible combination of a selection of
+options. It is also possible to run a sequence of tests with a static set of
+configuration options, or to run a test once, again using static options. The
+test configuration is designed to be programmatically altered dynamically,
+to allow suites of tests to be created as described above.
+
+When a sequence of tests with a particular configuration is executed, there
+are two CSV files created. The first, {{XXXX-series.csv}} contains the raw
+results of each test run in the series. The second, {{XXXX-statistics.csv}}
+has generated statistics based on the raw data, including the mean, standard
+deviation, and 95% confidence interval limits. In these files, _XXXX_ is a
+unique number, referenced in {{framework.csv}} which lists all the test ids
+along with their dynamic configuration options. Finally, logging output
+during the run shows the progress of the testing.
+
+h1. Operation
+
+The test framework classes are part of the {{org.apache.qpid.perftests.dlq}}
+package. Under this, the {{test}} subpackage contains three classes, each with
+a {{main()}} method, that can be run from the command line. Each of these takes
+the name of a configuration property file as an argument. They are:
+
+* {{PerformanceTest}} - runs a single test
+* {{PerformanceStatistics}} - runs a test mutiple times and generates statistics
+* {{PerformanceFramework}} - sets up configuration permutations to run many tests
+
+The final framework class can also be run using the {{Framework.sh}} script
+found in this directory. Additionally, a sample test configuration file,
+{{config.properties}} illustrates all configuration options, and the {{log4j.xml}}
+file can be customised to add debug logging if required.
diff --git a/qpid/java/perftests/etc/dlq/config.properties b/qpid/java/perftests/etc/dlq/config.properties
new file mode 100644
index 0000000000..1d06339905
--- /dev/null
+++ b/qpid/java/perftests/etc/dlq/config.properties
@@ -0,0 +1,25 @@
+## qpid redelivery testing
+
+# statistics properties
+repeat = 10
+
+# shared properties
+broker = tcp://magenta:5672
+#broker = tcp://localhost:5672
+maxRedelivery = 3
+maxPrefetch = 1
+session = SESSION_TRANSACTED
+queue = test
+count = 1000
+persistent = true
+maxRecords = 2000
+
+# producer properties
+size = 4096
+messageIds = true
+
+## consumer properties
+threads = 1
+listener = false
+reject = 2
+rejectCount = 3 \ No newline at end of file
diff --git a/qpid/java/perftests/etc/dlq/log4j.xml b/qpid/java/perftests/etc/dlq/log4j.xml
new file mode 100644
index 0000000000..47510a107f
--- /dev/null
+++ b/qpid/java/perftests/etc/dlq/log4j.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<!-- ===================================================================== -->
+<!-- -->
+<!-- Log4j configuration for unit tests -->
+<!-- -->
+<!-- ===================================================================== -->
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+ <appender name="console" class="org.apache.log4j.ConsoleAppender">
+ <param name="Target" value="System.out"/>
+ <param name="Threshold" value="DEBUG"/>
+ <param name="ImmediateFlush" value="true"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%t %d %p [%c{4}] %m%n"/>
+ </layout>
+ </appender>
+
+ <logger name="org.apache.qpid">
+ <level value="ERROR"/>
+ </logger>
+
+ <logger name="qpid.message">
+ <level value="ERROR"/>
+ </logger>
+
+ <logger name="qpid.protocol">
+ <level value="ERROR"/>
+ </logger>
+
+ <logger name="org.apache.commons">
+ <level value="ERROR"/>
+ </logger>
+
+ <logger name="org.apache.qpid.perftests.dlq.test">
+ <level value="INFO"/>
+ </logger>
+
+ <!--
+ <logger name="org.apache.qpid.client.DeliveryCountTracker">
+ <level value="DEBUG"/>
+ </logger>
+ -->
+
+ <logger name="apache.commons.configuration.ConfigurationFactory">
+ <level value="ERROR"/>
+ </logger>
+
+ <root>
+ <level value="ERROR"/>
+ <appender-ref ref="console" />
+ </root>
+</log4j:configuration>
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Check.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Check.java
new file mode 100644
index 0000000000..aa7b510659
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Check.java
@@ -0,0 +1,59 @@
+package org.apache.qpid.perftests.dlq.client;
+
+import static org.apache.qpid.perftests.dlq.client.Config.*;
+
+import java.util.Properties;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.qpid.server.queue.AMQQueueFactory;
+
+public class Check extends Client
+{
+ private MessageConsumer _consumer;
+ private int _reject;
+ private int _check;
+
+ public Check(Properties props)
+ {
+ super(props);
+ }
+
+ public void init()
+ {
+ super.init();
+
+ _queueName = _props.getProperty(QUEUE) + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
+ _reject = Integer.parseInt(_props.getProperty(REJECT));
+ _sessionType = Session.AUTO_ACKNOWLEDGE;
+ _transacted = false;
+ _clientAck = false;
+ }
+
+ public void start() throws Exception
+ {
+ _consumer = _session.createConsumer(_queue);
+
+ _connection.start();
+ }
+
+ public Integer call() throws Exception {
+ start();
+
+ Message msg;
+ while ((msg = _consumer.receive(1000)) != null)
+ {
+ int number = msg.getIntProperty("number");
+ boolean rejectMessage = (number % _reject) == 0;
+ if (!rejectMessage)
+ {
+ throw new RuntimeException("unexpected message on dlq: " + number);
+ }
+ _check++;
+ }
+ return Integer.valueOf(_check);
+ }
+}
+
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Client.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Client.java
new file mode 100644
index 0000000000..c87ceae942
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Client.java
@@ -0,0 +1,137 @@
+package org.apache.qpid.perftests.dlq.client;
+
+import static org.apache.qpid.perftests.dlq.client.Config.*;
+
+import java.util.Properties;
+import java.util.concurrent.Callable;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.configuration.ClientProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class Client implements Callable<Integer>
+{
+ protected static final Logger _log = LoggerFactory.getLogger(Client.class);
+
+ protected Properties _props;
+
+ protected String _broker;
+ protected int _maxRedelivery;
+ protected int _maxPrefetch;
+ protected int _sessionType;
+ protected boolean _transacted;
+ protected boolean _clientAck;
+ protected String _queueName;
+ protected int _count;
+ protected boolean _messageIds;
+ protected boolean _persistent;
+ protected int _size;
+ protected int _threads;
+ protected int _maxRecords;
+
+ protected Connection _connection;
+ protected Session _session;
+ protected Destination _queue;
+ protected String _client = "client";
+
+ public Client(Properties props)
+ {
+ _props = props;
+
+ init();
+ }
+
+ public void init()
+ {
+ _broker = _props.getProperty(BROKER);
+ _maxRedelivery = Integer.parseInt(_props.getProperty(MAX_REDELIVERY));
+ _maxPrefetch = Integer.parseInt(_props.getProperty(MAX_PREFETCH));
+ _sessionType = getSessionType(_props.getProperty(SESSION));
+ _transacted = _sessionType == Session.SESSION_TRANSACTED;
+ _clientAck = _sessionType == Session.CLIENT_ACKNOWLEDGE;
+ _queueName = _props.getProperty(QUEUE);
+ _persistent = Boolean.parseBoolean(_props.getProperty(PERSISTENT));
+ _count = Integer.parseInt(_props.getProperty(COUNT));
+ _size = Integer.parseInt(_props.getProperty(SIZE));
+ _messageIds = !Boolean.parseBoolean(_props.getProperty(MESSAGE_IDS));
+ _threads = Integer.parseInt(_props.getProperty(THREADS));
+ _maxRecords = Integer.parseInt(_props.getProperty(MAX_RECORDS));
+ }
+
+ public void shutdown()
+ {
+ try
+ {
+ _connection.close();
+ }
+ catch (JMSException e)
+ {
+ _log.error("failed shutting down the connection", e);
+ }
+ }
+
+ public int getSessionType(String sessionType)
+ {
+ if (sessionType == null || sessionType.length() == 0)
+ {
+ throw new RuntimeException("empty or missing session property");
+ }
+ else if (sessionType.equalsIgnoreCase(SESSION_TRANSACTED))
+ {
+ return Session.SESSION_TRANSACTED;
+ }
+ else if (sessionType.equalsIgnoreCase(AUTO_ACKNOWLEDGE))
+ {
+ return Session.AUTO_ACKNOWLEDGE;
+ }
+ else if (sessionType.equalsIgnoreCase(CLIENT_ACKNOWLEDGE))
+ {
+ return Session.CLIENT_ACKNOWLEDGE;
+ }
+ else if (sessionType.equalsIgnoreCase(DUPS_OK_ACKNOWLEDGE))
+ {
+ return Session.DUPS_OK_ACKNOWLEDGE;
+ }
+ throw new RuntimeException("session property not recognised: " + sessionType);
+ }
+
+ public void connect()
+ {
+ String url = "amqp://guest:guest@" + _client + "/test?brokerlist='" + _broker + "'&maxprefetch='" + _maxPrefetch + "'&maxdeliverycount='" + _maxRedelivery + "'";
+ System.setProperty(ClientProperties.MAX_DELIVERY_RECORDS_PROP_NAME, Integer.toString(_maxRecords));
+
+ try
+ {
+ _connection = new AMQConnection(url);
+ _session = _connection.createSession(_transacted, _sessionType);
+ _queue = _session.createQueue(_queueName);
+ _connection.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException e)
+ {
+ _log.error("jms exception received", e);
+ System.exit(0);
+ }
+ });
+ }
+ catch (Exception e)
+ {
+ _log.error("Unable to setup connection, client and producer on broker", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public abstract void start() throws Exception;
+
+ public Integer call() throws Exception {
+ start();
+ return -1;
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Config.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Config.java
new file mode 100644
index 0000000000..343c4134a0
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Config.java
@@ -0,0 +1,29 @@
+package org.apache.qpid.perftests.dlq.client;
+
+public interface Config
+{
+ String BROKER = "broker";
+ String MAX_REDELIVERY = "maxRedelivery";
+ String MAX_PREFETCH = "maxPrefetch";
+ String SESSION = "session";
+ String QUEUE = "queue";
+ String PERSISTENT = "persistent";
+ String COUNT = "count";
+ String SIZE = "size";
+ String MESSAGE_IDS = "messageIds";
+ String THREADS = "threads";
+ String MAX_RECORDS = "maxRecords";
+ String LISTENER = "listener";
+ String REJECT = "reject";
+ String REJECT_COUNT = "rejectCount";
+ String REPEAT = "repeat";
+
+ String SESSION_TRANSACTED = "SESSION_TRANSACTED";
+ String AUTO_ACKNOWLEDGE = "AUTO_ACKNOWLEDGE";
+ String CLIENT_ACKNOWLEDGE = "CLIENT_ACKNOWLEDGE";
+ String DUPS_OK_ACKNOWLEDGE = "DUPS_OK_ACKNOWLEDGE";
+
+ String[] SESSION_VALUES = new String[] {
+ SESSION_TRANSACTED, AUTO_ACKNOWLEDGE, CLIENT_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE
+ };
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Create.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Create.java
new file mode 100644
index 0000000000..310591b85b
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Create.java
@@ -0,0 +1,58 @@
+package org.apache.qpid.perftests.dlq.client;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.BindingURL;
+
+public class Create extends Client
+{
+ private MessageConsumer _consumer;
+
+ public Create(Properties props)
+ {
+ super(props);
+ }
+
+ public void init()
+ {
+ super.init();
+
+ _sessionType = Session.AUTO_ACKNOWLEDGE;
+ _transacted = false;
+ _clientAck = false;
+ }
+
+ public void start() throws Exception
+ {
+ _connection.start();
+
+ BindingURL burl = new AMQBindingURL("direct://amq.direct//" + _queueName + "?maxdeliverycount='" + _maxRedelivery + "'");
+ _queue = new AMQQueue(burl);
+
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put(AMQQueueFactory.X_QPID_DLQ_ENABLED.asString(), true);
+
+ ((AMQSession<?,?>) _session).createQueue(new AMQShortString(_queueName), false, false, false, arguments);
+ ((AMQSession<?,?>) _session).declareAndBind((AMQDestination) new AMQQueue("amq.direct", _queueName));
+
+ _consumer = _session.createConsumer(_queue);
+ while (_consumer.receive(1000) != null);
+ _consumer.close();
+
+ _queue = _session.createQueue(_queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+ _consumer = _session.createConsumer(_queue);
+ while (_consumer.receive(1000) != null);
+ }
+}
+
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Receiver.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Receiver.java
new file mode 100644
index 0000000000..1eb3c47983
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Receiver.java
@@ -0,0 +1,161 @@
+package org.apache.qpid.perftests.dlq.client;
+
+import static org.apache.qpid.perftests.dlq.client.Config.*;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+
+import org.apache.qpid.perftests.dlq.test.PerformanceTest;
+
+
+
+public class Receiver extends Client
+{
+ private MessageConsumer _consumer;
+ private boolean _listener;
+ private int _reject;
+ private int _rejectCount;
+ private Map<Integer, Integer> _rejected = new HashMap<Integer, Integer>();
+
+ private static volatile boolean _stopped;
+ private static CountDownLatch _finished;
+ private static AtomicInteger _id;
+ private static AtomicInteger _received;
+
+ public Receiver(Properties props)
+ {
+ super(props);
+
+ _client = String.format("%04d", _id.incrementAndGet());
+ }
+
+ public static void reset()
+ {
+ _id = new AtomicInteger(0);
+ _received = new AtomicInteger(0);
+ _finished = new CountDownLatch(1);
+ _stopped = false;
+ }
+
+ public void start() throws Exception
+ {
+ _listener = Boolean.parseBoolean(_props.getProperty(LISTENER));
+ _reject = Integer.parseInt(_props.getProperty(REJECT));
+ _rejectCount = Integer.parseInt(_props.getProperty(REJECT_COUNT));
+
+ _consumer = _session.createConsumer(_queue);
+
+ _connection.start();
+ }
+
+ public void startListener() throws Exception
+ {
+ _consumer.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message msg)
+ {
+ processMessage(msg);
+ }
+ });
+ }
+
+ public void startReceiver() throws Exception
+ {
+ while (!_stopped)
+ {
+ Message msg = _consumer.receive(1000);
+ processMessage(msg);
+ }
+ }
+
+ public void processMessage(Message msg)
+ {
+ try
+ {
+ _received.incrementAndGet();
+ int number = msg.getIntProperty("number");
+ if (number % 100 == 0)
+ {
+ _log.info("client " + _client + " got message " + number);
+ }
+
+ boolean rejectMessage = (number % _reject) == 0;
+ if (rejectMessage)
+ {
+ int rejectCount = 0;
+ if (_rejected.containsKey(number))
+ {
+ rejectCount = _rejected.get(number);
+ }
+ _rejected.put(number, ++rejectCount);
+ if (rejectCount <= _rejectCount)
+ {
+ if (rejectCount >= _maxRedelivery)
+ {
+ _log.info("rejecting message (" + rejectCount + ") " + msg.getJMSMessageID());
+ }
+ if (_transacted)
+ {
+ _session.rollback();
+ }
+ else
+ {
+ _session.recover();
+ }
+ }
+ else
+ {
+ rejectMessage = false;
+ }
+ }
+
+ if (!rejectMessage)
+ {
+ if (_transacted)
+ {
+ _session.commit();
+ }
+ else if (_clientAck)
+ {
+ msg.acknowledge();
+ }
+ if (number == (_count - 1))
+ {
+ _stopped = true;
+ _finished.countDown();
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ _log.error("failed to process message", e);
+ _stopped = true;
+ _finished.countDown();
+ }
+ }
+
+ public Integer call() throws Exception
+ {
+ start();
+
+ if (_listener)
+ {
+ startListener();
+ }
+ else
+ {
+ startReceiver();
+ }
+
+ _finished.await();
+ PerformanceTest.countDown();
+ return _received.get();
+ }
+} \ No newline at end of file
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Sender.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Sender.java
new file mode 100644
index 0000000000..e6c2ff49e2
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Sender.java
@@ -0,0 +1,94 @@
+package org.apache.qpid.perftests.dlq.client;
+
+import java.util.Properties;
+
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+public class Sender extends Client
+{
+ public Sender(Properties props)
+ {
+ super(props);
+ }
+
+ private MessageProducer _producer;
+
+ public void init()
+ {
+ super.init();
+
+ _sessionType = Session.AUTO_ACKNOWLEDGE;
+ _transacted = false;
+ _clientAck = false;
+ }
+
+ public Integer call() throws Exception {
+ start();
+
+ Message msg = createMessage();
+ int sent = sendMessages(msg);
+
+ return Integer.valueOf(sent);
+ }
+
+ public void start() throws Exception
+ {
+ _producer = _session.createProducer(_queue);
+ _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+ _producer.setDisableMessageID(_messageIds);
+
+ _connection.start();
+ }
+
+ public Message createMessage()
+ {
+ // Setup the message to send
+ TextMessage msg;
+ try
+ {
+ byte[] bytes = new byte[_count];
+ for (int b = 0; b < bytes.length; b++)
+ {
+ bytes[b] = (byte) "ABCDEFGHIJKLMNOPQRSTUVWXYZ".charAt(b % 26);
+ }
+ String content = new String(bytes);
+ //Now create the actual message you want to send
+ msg = _session.createTextMessage(content);
+ return msg;
+ }
+ catch (JMSException e)
+ {
+ _log.error("Unable to create message", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public int sendMessages(Message msg)
+ {
+ for (int i = 0; i < _count; i++)
+ {
+ if (i % 100 == 0)
+ {
+ _log.info("message " + i);
+ }
+ try
+ {
+ msg.setIntProperty("number", i);
+ _producer.send(msg);
+ }
+ catch (JMSException e)
+ {
+ _log.error("jms exception in sender", e);
+ return i;
+ }
+ }
+
+ return _count;
+ }
+}
+
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceFramework.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceFramework.java
new file mode 100644
index 0000000000..3db257362c
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceFramework.java
@@ -0,0 +1,128 @@
+package org.apache.qpid.perftests.dlq.test;
+
+import static org.apache.qpid.perftests.dlq.client.Config.*;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PerformanceFramework
+{
+ private static final Logger _log = LoggerFactory.getLogger(PerformanceFramework.class);
+ private static final String _date = new SimpleDateFormat("yyyyMMdd").format(new Date());
+
+ private static File _dir;
+
+ private Properties _props;
+ private int _id = 0;
+
+ public PerformanceFramework(File propertyFile)
+ {
+ try
+ {
+ InputStream input = new FileInputStream(propertyFile);
+ _props = new Properties();
+ _props.load(input);
+ }
+ catch (IOException ioe)
+ {
+ throw new RuntimeException("file error with " + propertyFile.getName());
+ }
+ }
+
+ public PerformanceFramework(Properties props)
+ {
+ _props = props;
+ }
+
+ public void runAll(File file) throws Exception
+ {
+ _log.info("starting test framework");
+ PrintStream out = new PrintStream(new FileOutputStream(file));
+ out.println("id,maxRedelivery,rejectCount,messageIds,listener,session");
+ for (int maxRedelivery = 0; maxRedelivery < 4; maxRedelivery++)
+ {
+ _props.setProperty(MAX_REDELIVERY, Integer.toString(maxRedelivery));
+ for (int rejectCount = 0; rejectCount < maxRedelivery + 1; rejectCount++)
+ {
+ _props.setProperty(REJECT_COUNT, Integer.toString(rejectCount));
+ for (int messageIds = 0; messageIds < 2; messageIds++)
+ {
+ _props.setProperty(MESSAGE_IDS, Boolean.toString(messageIds == 0));
+ for (int listener = 0; listener < 2; listener++)
+ {
+ _props.setProperty(LISTENER, Boolean.toString(listener == 0));
+ for (int session = 0; session < 4; session++)
+ {
+ _props.setProperty(SESSION, SESSION_VALUES[session]);
+ _id++;
+ out.println(_id + "," + maxRedelivery + "," + rejectCount + "," +
+ Boolean.toString(messageIds == 0) + "," + Boolean.toString(listener == 0) +
+ "," + SESSION_VALUES[session]);
+ runOnce(_id);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ public void runOnce(int id)
+ {
+ PerformanceStatistics stats = new PerformanceStatistics(_props);
+ try
+ {
+ _log.info("starting test id " + id);
+ String fileId = String.format("%04d", id);
+ stats.series(new File(_dir, fileId + "-series.csv"));
+ _log.info("test id " + id + " completed ok");
+ stats.statistics(new File(_dir, fileId + "-statistics.csv"));
+ }
+ catch (Exception e)
+ {
+ _log.error("failed test id " + id + " with error", e);
+ }
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ if (argv.length != 1)
+ {
+ throw new IllegalArgumentException("must pass name of property file as argument");
+ }
+
+ File propertyFile = new File(argv[0]);
+ if (!propertyFile.exists() || !propertyFile.canRead())
+ {
+ throw new RuntimeException("property file '" + propertyFile.getAbsolutePath() + "' must exist and be readable");
+ }
+
+ int dirId = 0;
+ while (true)
+ {
+ _dir = new File(String.format("%s-%02d", _date, dirId++));
+ if (_dir.exists())
+ {
+ continue;
+ }
+ else
+ {
+ _dir.mkdir();
+ break;
+ }
+ }
+
+ PerformanceFramework framework = new PerformanceFramework(propertyFile);
+ framework.runAll(new File(_dir, "framework.csv"));
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceStatistics.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceStatistics.java
new file mode 100644
index 0000000000..a3f6a3fcba
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceStatistics.java
@@ -0,0 +1,132 @@
+package org.apache.qpid.perftests.dlq.test;
+
+import static org.apache.qpid.perftests.dlq.client.Config.*;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PerformanceStatistics
+{
+ private static final Logger _log = LoggerFactory.getLogger(PerformanceStatistics.class);
+
+ private Properties _props;
+ private List<Double> _sent = new ArrayList<Double>();
+ private List<Double> _received = new ArrayList<Double>();
+ private List<Double> _rejected = new ArrayList<Double>();
+ private List<Double> _duration = new ArrayList<Double>();
+ private List<Double> _throughputIn = new ArrayList<Double>();
+ private List<Double> _throughputOut = new ArrayList<Double>();
+ private List<Double> _bandwidthIn = new ArrayList<Double>();
+ private List<Double> _bandwidthOut = new ArrayList<Double>();
+ private List<Double> _latency = new ArrayList<Double>();
+ private List<Statistics> _statistics = new ArrayList<Statistics>();
+
+ public PerformanceStatistics(File propertyFile)
+ {
+ try
+ {
+ InputStream input = new FileInputStream(propertyFile);
+ _props = new Properties();
+ _props.load(input);
+ }
+ catch (IOException ioe)
+ {
+ throw new RuntimeException("file error with " + propertyFile.getName());
+ }
+ }
+
+ public PerformanceStatistics(Properties props)
+ {
+ _props = props;
+ }
+
+ public void single(PrintStream out) throws Exception
+ {
+ PerformanceTest client = new PerformanceTest(_props);
+ client.test();
+ client.check(out);
+ _sent.add(client.getSent());
+ _received.add(client.getReceived());
+ _rejected.add(client.getRejected());
+ _duration.add(client.getDuration());
+ _throughputIn.add(client.getThroughputIn());
+ _throughputOut.add(client.getThroughputOut());
+ _bandwidthIn.add(client.getBandwidthIn());
+ _bandwidthOut.add(client.getBandwidthOut());
+ _latency.add(client.getLatency());
+ }
+
+ public void series(File file) throws Exception
+ {
+ try
+ {
+ PrintStream out = new PrintStream(new FileOutputStream(file));
+ out.println(PerformanceTest.getHeader());
+ int repeat = Integer.parseInt(_props.getProperty(REPEAT));
+ for (int i = 0; i < repeat; i++)
+ {
+ _log.info("starting individual test run " + i);
+ single(out);
+ }
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("error running test series", e);
+ }
+
+ _statistics.add(new Statistics(_sent, "sent"));
+ _statistics.add(new Statistics(_received, "received"));
+ _statistics.add(new Statistics(_rejected, "rejected"));
+ _statistics.add(new Statistics(_duration, "duration"));
+ _statistics.add(new Statistics(_throughputIn, "throughputIn"));
+ _statistics.add(new Statistics(_throughputOut, "throughputOut"));
+ _statistics.add(new Statistics(_bandwidthIn, "bandwidthIn"));
+ _statistics.add(new Statistics(_bandwidthOut, "bandwidthOut"));
+ _statistics.add(new Statistics(_latency, "latency"));
+ }
+
+ public void statistics(File file)
+ {
+ try
+ {
+ PrintStream out = new PrintStream(new FileOutputStream(file));
+ out.println(Statistics.getHeader());
+ for (Statistics stats : _statistics)
+ {
+ out.println(stats.toString());
+ }
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("error outputting stats", e);
+ }
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ if (argv.length != 1)
+ {
+ throw new IllegalArgumentException("must pass name of property file as argument");
+ }
+
+ File propertyFile = new File(argv[0]);
+ if (!propertyFile.exists() || !propertyFile.canRead())
+ {
+ throw new RuntimeException("property file '" + propertyFile.getAbsolutePath() + "' must exist and be readable");
+ }
+
+ PerformanceStatistics stats = new PerformanceStatistics(propertyFile);
+ stats.series(new File("series.csv"));
+ stats.statistics(new File("statistics.csv"));
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java
new file mode 100644
index 0000000000..a8c7e7a905
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java
@@ -0,0 +1,237 @@
+package org.apache.qpid.perftests.dlq.test;
+
+import static org.apache.qpid.perftests.dlq.client.Config.*;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.qpid.perftests.dlq.client.Check;
+import org.apache.qpid.perftests.dlq.client.Create;
+import org.apache.qpid.perftests.dlq.client.Receiver;
+import org.apache.qpid.perftests.dlq.client.Sender;
+
+
+public class PerformanceTest
+{
+ private static CountDownLatch _latch;
+
+ private ExecutorService _executor;
+ private Properties _props;
+ private int _size = 0;
+ private int _threads = 0;
+ private int _sent = 0;
+ private int _received = 0;
+ private int _rejected = 0;
+ private long _started = 0;
+ private long _finished = 0;
+
+ private String _session;
+ private int _count;
+ private int _reject;
+ private int _rejectCount;
+ private int _maxRedelivery;
+ private boolean _messageIds;
+ private boolean _listener;
+
+ public PerformanceTest(File propertyFile)
+ {
+ try
+ {
+ InputStream input = new FileInputStream(propertyFile);
+ _props = new Properties();
+ _props.load(input);
+ }
+ catch (IOException ioe)
+ {
+ throw new RuntimeException("file error with " + propertyFile.getName());
+ }
+ }
+
+ public PerformanceTest(Properties props)
+ {
+ _props = props;
+ }
+
+ public void test() throws Exception
+ {
+ Create create = new Create(_props);
+ create.connect();
+ create.call();
+ create.shutdown();
+
+ _executor = Executors.newCachedThreadPool();
+ _threads = Integer.parseInt(_props.getProperty(THREADS));
+ _size = Integer.parseInt(_props.getProperty(SIZE));
+ _count = Integer.parseInt(_props.getProperty(COUNT));
+ _reject = Integer.parseInt(_props.getProperty(REJECT));
+ _rejectCount = Integer.parseInt(_props.getProperty(REJECT_COUNT));
+ _maxRedelivery = Integer.parseInt(_props.getProperty(MAX_REDELIVERY));
+ _session = _props.getProperty(SESSION);
+ _listener = Boolean.parseBoolean(_props.getProperty(LISTENER));
+ _messageIds = Boolean.parseBoolean(_props.getProperty(MESSAGE_IDS));
+ _latch = new CountDownLatch(1);
+ _started = System.nanoTime();
+
+ Sender sender = new Sender(_props);
+ sender.connect();
+ Future<Integer> send = _executor.submit(sender);
+
+ Receiver.reset();
+ List<Future<Integer>> receives = new ArrayList<Future<Integer>>();
+ List<Receiver> receivers = new ArrayList<Receiver>();
+ for (int i = 0; i < _threads; i++)
+ {
+ Receiver receiver = new Receiver(_props);
+ receiver.connect();
+ receivers.add(receiver);
+ receives.add(_executor.submit(receiver));
+ }
+
+ try
+ {
+ _latch.await();
+ _finished = System.nanoTime();
+ _sent = send.get();
+ for (Future<Integer> receive : receives)
+ {
+ _received += receive.get();
+ }
+
+ Check check = new Check(_props);
+ check.connect();
+ _rejected = check.call();
+ check.shutdown();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("error running tests", e);
+ }
+ finally
+ {
+ sender.shutdown();
+ for (Receiver receiver : receivers)
+ {
+ receiver.shutdown();
+ }
+ _executor.shutdownNow();
+ }
+ }
+
+ public void check(PrintStream out)
+ {
+ StringBuilder error = new StringBuilder();
+ if (_sent != _count)
+ {
+ error.append("sent ").append(_sent).append(" not ").append(_count).append('\n');
+ }
+ boolean sessionOk = ((_session.equalsIgnoreCase(CLIENT_ACKNOWLEDGE)) || (_session.equalsIgnoreCase(SESSION_TRANSACTED)) ||
+ ((_session.equalsIgnoreCase(AUTO_ACKNOWLEDGE) || _session.equalsIgnoreCase(DUPS_OK_ACKNOWLEDGE)) && _listener));
+ int rejected = (!sessionOk || !_messageIds || _maxRedelivery == 0 || _rejectCount < _maxRedelivery) ? 0 : _count / _reject;
+ if (_rejected != rejected)
+ {
+ error.append("rejected ").append(_rejected).append(" not ").append(rejected).append('\n');
+ }
+ int received = (_count - rejected) + (sessionOk ? ((_count / _reject) * _rejectCount) : 0);
+ if (_received != received)
+ {
+ error.append("received ").append(_received).append(" not ").append(received).append('\n');
+ }
+ if (error.length() > 0)
+ {
+ out.print(error.toString());
+ }
+ else
+ {
+ out.println(toString());
+ }
+ }
+
+ public static String getHeader()
+ {
+ return "sent,received,rejected,duration";
+ }
+
+ public String toString()
+ {
+ String results = String.format("%d,%d,%d,%f", _sent, _received, _rejected, getDuration());
+ return results;
+ }
+
+ public double getSent()
+ {
+ return (double) _sent;
+ }
+
+ public double getReceived()
+ {
+ return (double) _received;
+ }
+
+ public double getDuration()
+ {
+ return ((double) _finished - (double) _started) / 1000000000.0d;
+ }
+
+ public double getRejected()
+ {
+ return (double) _rejected;
+ }
+
+ public double getThroughputIn()
+ {
+ return getSent() / getDuration();
+ }
+
+ public double getThroughputOut()
+ {
+ return getReceived() / getDuration();
+ }
+
+ public double getBandwidthIn()
+ {
+ return (getSent() * (double) _size) / getDuration();
+ }
+
+ public double getBandwidthOut()
+ {
+ return (getReceived() * (double) _size) / getDuration();
+ }
+
+ public double getLatency()
+ {
+ return getDuration() / getReceived();
+ }
+
+ public static void countDown()
+ {
+ _latch.countDown();
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ if (argv.length != 1)
+ {
+ throw new IllegalArgumentException("must pass name of propert file as argument");
+ }
+
+ File propertyFile = new File(argv[0]);
+ if (!propertyFile.exists() || !propertyFile.canRead())
+ {
+ throw new RuntimeException("property file '" + propertyFile.getName() + "' must exist and be readable");
+ }
+ PerformanceTest client = new PerformanceTest(propertyFile);
+ client.test();
+ client.check(System.out);
+ }
+}
+
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/Statistics.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/Statistics.java
new file mode 100644
index 0000000000..4804aaaa42
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/Statistics.java
@@ -0,0 +1,106 @@
+package org.apache.qpid.perftests.dlq.test;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class Statistics
+{
+ private String _data;
+ private double _mean;
+ private double _standardDeviation;
+ private double _tValue;
+ private double _min = Double.MAX_VALUE;
+ private double _max = Double.MIN_VALUE;
+
+ private List<Double> tDistribution95 = Arrays.asList(
+ 12.70620473d, 4.30265273d, 3.182446305d, 2.776445105d, 2.570581835d,
+ 2.446911846d, 2.364624251d, 2.306004133d, 2.262157158d, 2.228138842d,
+ 2.200985159d, 2.178812827d, 2.160368652d, 2.144786681d, 2.131449536d,
+ 2.119905285d, 2.109815559d, 2.100922037d, 2.09302405d, 2.085963441d,
+ 2.079613837d, 2.073873058d, 2.068657599d, 2.063898547d, 2.059538536d
+ );
+
+ public Statistics(List<Double> samples, String data)
+ {
+ _data = data;
+
+ double n = (double) samples.size();
+ double total = 0.0d;
+ for (Double s : samples)
+ {
+ total += s;
+ _min = Math.min(_min, s);
+ _max = Math.max(_max, s);
+ }
+ _mean = total / n;
+ double deviationSquared = 0.0d;
+ for (Double s : samples)
+ {
+ double deviation = s - _mean;
+ deviationSquared += (deviation * deviation);
+ }
+ _standardDeviation = Math.sqrt(deviationSquared / (n - 1));
+ _tValue = tDistribution95.get((int) (n - 2)) * (_standardDeviation / Math.sqrt(n));
+ }
+
+ public String getData()
+ {
+ return _data;
+ }
+
+ public double getMean()
+ {
+ return _mean;
+ }
+
+ public double getMin()
+ {
+ return _min;
+ }
+
+ public double getMax()
+ {
+ return _max;
+ }
+
+ public double getStandardDeviation()
+ {
+ return _standardDeviation;
+ }
+
+ public double getIntervalMin()
+ {
+ return _mean - _tValue;
+ }
+
+ public double getIntervalMax()
+ {
+ return _mean + _tValue;
+ }
+
+ public double getInterval()
+ {
+ return 2.0d + _tValue;
+ }
+
+ public double getRange()
+ {
+ return getMax() - getMin();
+ }
+
+ public static String getHeader()
+ {
+ return "data,mean,min,max,range,stdev,min95,max95,int95";
+ }
+
+ public String toString()
+ {
+ String results = String.format("%s,%f,%f,%f,%f,%f,%f,%f,%f",
+ _data,
+ getMean(),
+ getMin(), getMax(), getRange(),
+ getStandardDeviation(),
+ getIntervalMin(), getIntervalMax(), getInterval());
+ return results;
+ }
+}