diff options
author | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-12-10 00:05:49 +0000 |
---|---|---|
committer | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-12-10 00:05:49 +0000 |
commit | b8983f8dd7679688a8bdd75a47e5cf24ab2ce513 (patch) | |
tree | 93fbc183693a191aebba133f4049674ca511d00c | |
parent | 8b8ddcaa974de4387e3dd84d12a96435ceaf16b3 (diff) | |
download | qpid-python-b8983f8dd7679688a8bdd75a47e5cf24ab2ce513.tar.gz |
QPID-2970: Performance testing for DLQs
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5.x-dev@1044180 13f79535-47bb-0310-9956-ffa450edef68
14 files changed, 1307 insertions, 0 deletions
diff --git a/qpid/java/perftests/etc/dlq/Framework.sh b/qpid/java/perftests/etc/dlq/Framework.sh new file mode 100755 index 0000000000..f6db87e557 --- /dev/null +++ b/qpid/java/perftests/etc/dlq/Framework.sh @@ -0,0 +1,26 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Setup Java CLASSPATH +CLASSPATH=${QPID_HOME}/lib/qpid-all-${VERSION}.jar:${QPID_HOME}/lib/qpid-perftests-${VERSION}.jar + +# Run Performance Test Framework +echo "Running DLQ Performance Tests" +java -cp ${CLASSPATH} org.apache.qpid.perftests.dlq.test.PerformanceFramework $* diff --git a/qpid/java/perftests/etc/dlq/README.txt b/qpid/java/perftests/etc/dlq/README.txt new file mode 100644 index 0000000000..8ca7a1fb7b --- /dev/null +++ b/qpid/java/perftests/etc/dlq/README.txt @@ -0,0 +1,42 @@ +h1. Dead Letter Queue Performance Testing + +It is important to be able to understand how the addition of Dead Letter Queue +capabilities (DLQs) affects the Qpid performance. This is because the main body +of the work to determine whether a message should be re-delivered or enqueued +on the DLQ takes places on the main message delivery path, and will therefore +be called during *every* mesage transit of the broker. This has the potential +to increase latency, and these tests are designed to allow developers to +understand how various DLQ parameters and options affect throughput and +latency times. + +The results are generated as a series of CSV text files, based on running the +same DLQ performance test with every possible combination of a selection of +options. It is also possible to run a sequence of tests with a static set of +configuration options, or to run a test once, again using static options. The +test configuration is designed to be programmatically altered dynamically, +to allow suites of tests to be created as described above. + +When a sequence of tests with a particular configuration is executed, there +are two CSV files created. The first, {{XXXX-series.csv}} contains the raw +results of each test run in the series. The second, {{XXXX-statistics.csv}} +has generated statistics based on the raw data, including the mean, standard +deviation, and 95% confidence interval limits. In these files, _XXXX_ is a +unique number, referenced in {{framework.csv}} which lists all the test ids +along with their dynamic configuration options. Finally, logging output +during the run shows the progress of the testing. + +h1. Operation + +The test framework classes are part of the {{org.apache.qpid.perftests.dlq}} +package. Under this, the {{test}} subpackage contains three classes, each with +a {{main()}} method, that can be run from the command line. Each of these takes +the name of a configuration property file as an argument. They are: + +* {{PerformanceTest}} - runs a single test +* {{PerformanceStatistics}} - runs a test mutiple times and generates statistics +* {{PerformanceFramework}} - sets up configuration permutations to run many tests + +The final framework class can also be run using the {{Framework.sh}} script +found in this directory. Additionally, a sample test configuration file, +{{config.properties}} illustrates all configuration options, and the {{log4j.xml}} +file can be customised to add debug logging if required. diff --git a/qpid/java/perftests/etc/dlq/config.properties b/qpid/java/perftests/etc/dlq/config.properties new file mode 100644 index 0000000000..1d06339905 --- /dev/null +++ b/qpid/java/perftests/etc/dlq/config.properties @@ -0,0 +1,25 @@ +## qpid redelivery testing + +# statistics properties +repeat = 10 + +# shared properties +broker = tcp://magenta:5672 +#broker = tcp://localhost:5672 +maxRedelivery = 3 +maxPrefetch = 1 +session = SESSION_TRANSACTED +queue = test +count = 1000 +persistent = true +maxRecords = 2000 + +# producer properties +size = 4096 +messageIds = true + +## consumer properties +threads = 1 +listener = false +reject = 2 +rejectCount = 3
\ No newline at end of file diff --git a/qpid/java/perftests/etc/dlq/log4j.xml b/qpid/java/perftests/etc/dlq/log4j.xml new file mode 100644 index 0000000000..47510a107f --- /dev/null +++ b/qpid/java/perftests/etc/dlq/log4j.xml @@ -0,0 +1,73 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> + +<!-- ===================================================================== --> +<!-- --> +<!-- Log4j configuration for unit tests --> +<!-- --> +<!-- ===================================================================== --> + +<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> + <appender name="console" class="org.apache.log4j.ConsoleAppender"> + <param name="Target" value="System.out"/> + <param name="Threshold" value="DEBUG"/> + <param name="ImmediateFlush" value="true"/> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%t %d %p [%c{4}] %m%n"/> + </layout> + </appender> + + <logger name="org.apache.qpid"> + <level value="ERROR"/> + </logger> + + <logger name="qpid.message"> + <level value="ERROR"/> + </logger> + + <logger name="qpid.protocol"> + <level value="ERROR"/> + </logger> + + <logger name="org.apache.commons"> + <level value="ERROR"/> + </logger> + + <logger name="org.apache.qpid.perftests.dlq.test"> + <level value="INFO"/> + </logger> + + <!-- + <logger name="org.apache.qpid.client.DeliveryCountTracker"> + <level value="DEBUG"/> + </logger> + --> + + <logger name="apache.commons.configuration.ConfigurationFactory"> + <level value="ERROR"/> + </logger> + + <root> + <level value="ERROR"/> + <appender-ref ref="console" /> + </root> +</log4j:configuration> diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Check.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Check.java new file mode 100644 index 0000000000..aa7b510659 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Check.java @@ -0,0 +1,59 @@ +package org.apache.qpid.perftests.dlq.client; + +import static org.apache.qpid.perftests.dlq.client.Config.*; + +import java.util.Properties; + +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.apache.qpid.server.queue.AMQQueueFactory; + +public class Check extends Client +{ + private MessageConsumer _consumer; + private int _reject; + private int _check; + + public Check(Properties props) + { + super(props); + } + + public void init() + { + super.init(); + + _queueName = _props.getProperty(QUEUE) + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; + _reject = Integer.parseInt(_props.getProperty(REJECT)); + _sessionType = Session.AUTO_ACKNOWLEDGE; + _transacted = false; + _clientAck = false; + } + + public void start() throws Exception + { + _consumer = _session.createConsumer(_queue); + + _connection.start(); + } + + public Integer call() throws Exception { + start(); + + Message msg; + while ((msg = _consumer.receive(1000)) != null) + { + int number = msg.getIntProperty("number"); + boolean rejectMessage = (number % _reject) == 0; + if (!rejectMessage) + { + throw new RuntimeException("unexpected message on dlq: " + number); + } + _check++; + } + return Integer.valueOf(_check); + } +} + diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Client.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Client.java new file mode 100644 index 0000000000..c87ceae942 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Client.java @@ -0,0 +1,137 @@ +package org.apache.qpid.perftests.dlq.client; + +import static org.apache.qpid.perftests.dlq.client.Config.*; + +import java.util.Properties; +import java.util.concurrent.Callable; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Session; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.configuration.ClientProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class Client implements Callable<Integer> +{ + protected static final Logger _log = LoggerFactory.getLogger(Client.class); + + protected Properties _props; + + protected String _broker; + protected int _maxRedelivery; + protected int _maxPrefetch; + protected int _sessionType; + protected boolean _transacted; + protected boolean _clientAck; + protected String _queueName; + protected int _count; + protected boolean _messageIds; + protected boolean _persistent; + protected int _size; + protected int _threads; + protected int _maxRecords; + + protected Connection _connection; + protected Session _session; + protected Destination _queue; + protected String _client = "client"; + + public Client(Properties props) + { + _props = props; + + init(); + } + + public void init() + { + _broker = _props.getProperty(BROKER); + _maxRedelivery = Integer.parseInt(_props.getProperty(MAX_REDELIVERY)); + _maxPrefetch = Integer.parseInt(_props.getProperty(MAX_PREFETCH)); + _sessionType = getSessionType(_props.getProperty(SESSION)); + _transacted = _sessionType == Session.SESSION_TRANSACTED; + _clientAck = _sessionType == Session.CLIENT_ACKNOWLEDGE; + _queueName = _props.getProperty(QUEUE); + _persistent = Boolean.parseBoolean(_props.getProperty(PERSISTENT)); + _count = Integer.parseInt(_props.getProperty(COUNT)); + _size = Integer.parseInt(_props.getProperty(SIZE)); + _messageIds = !Boolean.parseBoolean(_props.getProperty(MESSAGE_IDS)); + _threads = Integer.parseInt(_props.getProperty(THREADS)); + _maxRecords = Integer.parseInt(_props.getProperty(MAX_RECORDS)); + } + + public void shutdown() + { + try + { + _connection.close(); + } + catch (JMSException e) + { + _log.error("failed shutting down the connection", e); + } + } + + public int getSessionType(String sessionType) + { + if (sessionType == null || sessionType.length() == 0) + { + throw new RuntimeException("empty or missing session property"); + } + else if (sessionType.equalsIgnoreCase(SESSION_TRANSACTED)) + { + return Session.SESSION_TRANSACTED; + } + else if (sessionType.equalsIgnoreCase(AUTO_ACKNOWLEDGE)) + { + return Session.AUTO_ACKNOWLEDGE; + } + else if (sessionType.equalsIgnoreCase(CLIENT_ACKNOWLEDGE)) + { + return Session.CLIENT_ACKNOWLEDGE; + } + else if (sessionType.equalsIgnoreCase(DUPS_OK_ACKNOWLEDGE)) + { + return Session.DUPS_OK_ACKNOWLEDGE; + } + throw new RuntimeException("session property not recognised: " + sessionType); + } + + public void connect() + { + String url = "amqp://guest:guest@" + _client + "/test?brokerlist='" + _broker + "'&maxprefetch='" + _maxPrefetch + "'&maxdeliverycount='" + _maxRedelivery + "'"; + System.setProperty(ClientProperties.MAX_DELIVERY_RECORDS_PROP_NAME, Integer.toString(_maxRecords)); + + try + { + _connection = new AMQConnection(url); + _session = _connection.createSession(_transacted, _sessionType); + _queue = _session.createQueue(_queueName); + _connection.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException e) + { + _log.error("jms exception received", e); + System.exit(0); + } + }); + } + catch (Exception e) + { + _log.error("Unable to setup connection, client and producer on broker", e); + throw new RuntimeException(e); + } + } + + public abstract void start() throws Exception; + + public Integer call() throws Exception { + start(); + return -1; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Config.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Config.java new file mode 100644 index 0000000000..343c4134a0 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Config.java @@ -0,0 +1,29 @@ +package org.apache.qpid.perftests.dlq.client; + +public interface Config +{ + String BROKER = "broker"; + String MAX_REDELIVERY = "maxRedelivery"; + String MAX_PREFETCH = "maxPrefetch"; + String SESSION = "session"; + String QUEUE = "queue"; + String PERSISTENT = "persistent"; + String COUNT = "count"; + String SIZE = "size"; + String MESSAGE_IDS = "messageIds"; + String THREADS = "threads"; + String MAX_RECORDS = "maxRecords"; + String LISTENER = "listener"; + String REJECT = "reject"; + String REJECT_COUNT = "rejectCount"; + String REPEAT = "repeat"; + + String SESSION_TRANSACTED = "SESSION_TRANSACTED"; + String AUTO_ACKNOWLEDGE = "AUTO_ACKNOWLEDGE"; + String CLIENT_ACKNOWLEDGE = "CLIENT_ACKNOWLEDGE"; + String DUPS_OK_ACKNOWLEDGE = "DUPS_OK_ACKNOWLEDGE"; + + String[] SESSION_VALUES = new String[] { + SESSION_TRANSACTED, AUTO_ACKNOWLEDGE, CLIENT_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE + }; +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Create.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Create.java new file mode 100644 index 0000000000..310591b85b --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Create.java @@ -0,0 +1,58 @@ +package org.apache.qpid.perftests.dlq.client; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.url.AMQBindingURL; +import org.apache.qpid.url.BindingURL; + +public class Create extends Client +{ + private MessageConsumer _consumer; + + public Create(Properties props) + { + super(props); + } + + public void init() + { + super.init(); + + _sessionType = Session.AUTO_ACKNOWLEDGE; + _transacted = false; + _clientAck = false; + } + + public void start() throws Exception + { + _connection.start(); + + BindingURL burl = new AMQBindingURL("direct://amq.direct//" + _queueName + "?maxdeliverycount='" + _maxRedelivery + "'"); + _queue = new AMQQueue(burl); + + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put(AMQQueueFactory.X_QPID_DLQ_ENABLED.asString(), true); + + ((AMQSession<?,?>) _session).createQueue(new AMQShortString(_queueName), false, false, false, arguments); + ((AMQSession<?,?>) _session).declareAndBind((AMQDestination) new AMQQueue("amq.direct", _queueName)); + + _consumer = _session.createConsumer(_queue); + while (_consumer.receive(1000) != null); + _consumer.close(); + + _queue = _session.createQueue(_queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + _consumer = _session.createConsumer(_queue); + while (_consumer.receive(1000) != null); + } +} + diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Receiver.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Receiver.java new file mode 100644 index 0000000000..1eb3c47983 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Receiver.java @@ -0,0 +1,161 @@ +package org.apache.qpid.perftests.dlq.client; + +import static org.apache.qpid.perftests.dlq.client.Config.*; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; + +import org.apache.qpid.perftests.dlq.test.PerformanceTest; + + + +public class Receiver extends Client +{ + private MessageConsumer _consumer; + private boolean _listener; + private int _reject; + private int _rejectCount; + private Map<Integer, Integer> _rejected = new HashMap<Integer, Integer>(); + + private static volatile boolean _stopped; + private static CountDownLatch _finished; + private static AtomicInteger _id; + private static AtomicInteger _received; + + public Receiver(Properties props) + { + super(props); + + _client = String.format("%04d", _id.incrementAndGet()); + } + + public static void reset() + { + _id = new AtomicInteger(0); + _received = new AtomicInteger(0); + _finished = new CountDownLatch(1); + _stopped = false; + } + + public void start() throws Exception + { + _listener = Boolean.parseBoolean(_props.getProperty(LISTENER)); + _reject = Integer.parseInt(_props.getProperty(REJECT)); + _rejectCount = Integer.parseInt(_props.getProperty(REJECT_COUNT)); + + _consumer = _session.createConsumer(_queue); + + _connection.start(); + } + + public void startListener() throws Exception + { + _consumer.setMessageListener(new MessageListener() + { + public void onMessage(Message msg) + { + processMessage(msg); + } + }); + } + + public void startReceiver() throws Exception + { + while (!_stopped) + { + Message msg = _consumer.receive(1000); + processMessage(msg); + } + } + + public void processMessage(Message msg) + { + try + { + _received.incrementAndGet(); + int number = msg.getIntProperty("number"); + if (number % 100 == 0) + { + _log.info("client " + _client + " got message " + number); + } + + boolean rejectMessage = (number % _reject) == 0; + if (rejectMessage) + { + int rejectCount = 0; + if (_rejected.containsKey(number)) + { + rejectCount = _rejected.get(number); + } + _rejected.put(number, ++rejectCount); + if (rejectCount <= _rejectCount) + { + if (rejectCount >= _maxRedelivery) + { + _log.info("rejecting message (" + rejectCount + ") " + msg.getJMSMessageID()); + } + if (_transacted) + { + _session.rollback(); + } + else + { + _session.recover(); + } + } + else + { + rejectMessage = false; + } + } + + if (!rejectMessage) + { + if (_transacted) + { + _session.commit(); + } + else if (_clientAck) + { + msg.acknowledge(); + } + if (number == (_count - 1)) + { + _stopped = true; + _finished.countDown(); + } + } + } + catch (Exception e) + { + _log.error("failed to process message", e); + _stopped = true; + _finished.countDown(); + } + } + + public Integer call() throws Exception + { + start(); + + if (_listener) + { + startListener(); + } + else + { + startReceiver(); + } + + _finished.await(); + PerformanceTest.countDown(); + return _received.get(); + } +}
\ No newline at end of file diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Sender.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Sender.java new file mode 100644 index 0000000000..e6c2ff49e2 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Sender.java @@ -0,0 +1,94 @@ +package org.apache.qpid.perftests.dlq.client; + +import java.util.Properties; + +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +public class Sender extends Client +{ + public Sender(Properties props) + { + super(props); + } + + private MessageProducer _producer; + + public void init() + { + super.init(); + + _sessionType = Session.AUTO_ACKNOWLEDGE; + _transacted = false; + _clientAck = false; + } + + public Integer call() throws Exception { + start(); + + Message msg = createMessage(); + int sent = sendMessages(msg); + + return Integer.valueOf(sent); + } + + public void start() throws Exception + { + _producer = _session.createProducer(_queue); + _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + _producer.setDisableMessageID(_messageIds); + + _connection.start(); + } + + public Message createMessage() + { + // Setup the message to send + TextMessage msg; + try + { + byte[] bytes = new byte[_count]; + for (int b = 0; b < bytes.length; b++) + { + bytes[b] = (byte) "ABCDEFGHIJKLMNOPQRSTUVWXYZ".charAt(b % 26); + } + String content = new String(bytes); + //Now create the actual message you want to send + msg = _session.createTextMessage(content); + return msg; + } + catch (JMSException e) + { + _log.error("Unable to create message", e); + throw new RuntimeException(e); + } + } + + public int sendMessages(Message msg) + { + for (int i = 0; i < _count; i++) + { + if (i % 100 == 0) + { + _log.info("message " + i); + } + try + { + msg.setIntProperty("number", i); + _producer.send(msg); + } + catch (JMSException e) + { + _log.error("jms exception in sender", e); + return i; + } + } + + return _count; + } +} + diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceFramework.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceFramework.java new file mode 100644 index 0000000000..3db257362c --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceFramework.java @@ -0,0 +1,128 @@ +package org.apache.qpid.perftests.dlq.test; + +import static org.apache.qpid.perftests.dlq.client.Config.*; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintStream; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Properties; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PerformanceFramework +{ + private static final Logger _log = LoggerFactory.getLogger(PerformanceFramework.class); + private static final String _date = new SimpleDateFormat("yyyyMMdd").format(new Date()); + + private static File _dir; + + private Properties _props; + private int _id = 0; + + public PerformanceFramework(File propertyFile) + { + try + { + InputStream input = new FileInputStream(propertyFile); + _props = new Properties(); + _props.load(input); + } + catch (IOException ioe) + { + throw new RuntimeException("file error with " + propertyFile.getName()); + } + } + + public PerformanceFramework(Properties props) + { + _props = props; + } + + public void runAll(File file) throws Exception + { + _log.info("starting test framework"); + PrintStream out = new PrintStream(new FileOutputStream(file)); + out.println("id,maxRedelivery,rejectCount,messageIds,listener,session"); + for (int maxRedelivery = 0; maxRedelivery < 4; maxRedelivery++) + { + _props.setProperty(MAX_REDELIVERY, Integer.toString(maxRedelivery)); + for (int rejectCount = 0; rejectCount < maxRedelivery + 1; rejectCount++) + { + _props.setProperty(REJECT_COUNT, Integer.toString(rejectCount)); + for (int messageIds = 0; messageIds < 2; messageIds++) + { + _props.setProperty(MESSAGE_IDS, Boolean.toString(messageIds == 0)); + for (int listener = 0; listener < 2; listener++) + { + _props.setProperty(LISTENER, Boolean.toString(listener == 0)); + for (int session = 0; session < 4; session++) + { + _props.setProperty(SESSION, SESSION_VALUES[session]); + _id++; + out.println(_id + "," + maxRedelivery + "," + rejectCount + "," + + Boolean.toString(messageIds == 0) + "," + Boolean.toString(listener == 0) + + "," + SESSION_VALUES[session]); + runOnce(_id); + } + } + } + } + } + } + + public void runOnce(int id) + { + PerformanceStatistics stats = new PerformanceStatistics(_props); + try + { + _log.info("starting test id " + id); + String fileId = String.format("%04d", id); + stats.series(new File(_dir, fileId + "-series.csv")); + _log.info("test id " + id + " completed ok"); + stats.statistics(new File(_dir, fileId + "-statistics.csv")); + } + catch (Exception e) + { + _log.error("failed test id " + id + " with error", e); + } + } + + public static void main(String[] argv) throws Exception + { + if (argv.length != 1) + { + throw new IllegalArgumentException("must pass name of property file as argument"); + } + + File propertyFile = new File(argv[0]); + if (!propertyFile.exists() || !propertyFile.canRead()) + { + throw new RuntimeException("property file '" + propertyFile.getAbsolutePath() + "' must exist and be readable"); + } + + int dirId = 0; + while (true) + { + _dir = new File(String.format("%s-%02d", _date, dirId++)); + if (_dir.exists()) + { + continue; + } + else + { + _dir.mkdir(); + break; + } + } + + PerformanceFramework framework = new PerformanceFramework(propertyFile); + framework.runAll(new File(_dir, "framework.csv")); + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceStatistics.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceStatistics.java new file mode 100644 index 0000000000..a3f6a3fcba --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceStatistics.java @@ -0,0 +1,132 @@ +package org.apache.qpid.perftests.dlq.test; + +import static org.apache.qpid.perftests.dlq.client.Config.*; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PerformanceStatistics +{ + private static final Logger _log = LoggerFactory.getLogger(PerformanceStatistics.class); + + private Properties _props; + private List<Double> _sent = new ArrayList<Double>(); + private List<Double> _received = new ArrayList<Double>(); + private List<Double> _rejected = new ArrayList<Double>(); + private List<Double> _duration = new ArrayList<Double>(); + private List<Double> _throughputIn = new ArrayList<Double>(); + private List<Double> _throughputOut = new ArrayList<Double>(); + private List<Double> _bandwidthIn = new ArrayList<Double>(); + private List<Double> _bandwidthOut = new ArrayList<Double>(); + private List<Double> _latency = new ArrayList<Double>(); + private List<Statistics> _statistics = new ArrayList<Statistics>(); + + public PerformanceStatistics(File propertyFile) + { + try + { + InputStream input = new FileInputStream(propertyFile); + _props = new Properties(); + _props.load(input); + } + catch (IOException ioe) + { + throw new RuntimeException("file error with " + propertyFile.getName()); + } + } + + public PerformanceStatistics(Properties props) + { + _props = props; + } + + public void single(PrintStream out) throws Exception + { + PerformanceTest client = new PerformanceTest(_props); + client.test(); + client.check(out); + _sent.add(client.getSent()); + _received.add(client.getReceived()); + _rejected.add(client.getRejected()); + _duration.add(client.getDuration()); + _throughputIn.add(client.getThroughputIn()); + _throughputOut.add(client.getThroughputOut()); + _bandwidthIn.add(client.getBandwidthIn()); + _bandwidthOut.add(client.getBandwidthOut()); + _latency.add(client.getLatency()); + } + + public void series(File file) throws Exception + { + try + { + PrintStream out = new PrintStream(new FileOutputStream(file)); + out.println(PerformanceTest.getHeader()); + int repeat = Integer.parseInt(_props.getProperty(REPEAT)); + for (int i = 0; i < repeat; i++) + { + _log.info("starting individual test run " + i); + single(out); + } + } + catch (Exception e) + { + throw new RuntimeException("error running test series", e); + } + + _statistics.add(new Statistics(_sent, "sent")); + _statistics.add(new Statistics(_received, "received")); + _statistics.add(new Statistics(_rejected, "rejected")); + _statistics.add(new Statistics(_duration, "duration")); + _statistics.add(new Statistics(_throughputIn, "throughputIn")); + _statistics.add(new Statistics(_throughputOut, "throughputOut")); + _statistics.add(new Statistics(_bandwidthIn, "bandwidthIn")); + _statistics.add(new Statistics(_bandwidthOut, "bandwidthOut")); + _statistics.add(new Statistics(_latency, "latency")); + } + + public void statistics(File file) + { + try + { + PrintStream out = new PrintStream(new FileOutputStream(file)); + out.println(Statistics.getHeader()); + for (Statistics stats : _statistics) + { + out.println(stats.toString()); + } + } + catch (Exception e) + { + throw new RuntimeException("error outputting stats", e); + } + } + + public static void main(String[] argv) throws Exception + { + if (argv.length != 1) + { + throw new IllegalArgumentException("must pass name of property file as argument"); + } + + File propertyFile = new File(argv[0]); + if (!propertyFile.exists() || !propertyFile.canRead()) + { + throw new RuntimeException("property file '" + propertyFile.getAbsolutePath() + "' must exist and be readable"); + } + + PerformanceStatistics stats = new PerformanceStatistics(propertyFile); + stats.series(new File("series.csv")); + stats.statistics(new File("statistics.csv")); + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java new file mode 100644 index 0000000000..a8c7e7a905 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java @@ -0,0 +1,237 @@ +package org.apache.qpid.perftests.dlq.test; + +import static org.apache.qpid.perftests.dlq.client.Config.*; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.qpid.perftests.dlq.client.Check; +import org.apache.qpid.perftests.dlq.client.Create; +import org.apache.qpid.perftests.dlq.client.Receiver; +import org.apache.qpid.perftests.dlq.client.Sender; + + +public class PerformanceTest +{ + private static CountDownLatch _latch; + + private ExecutorService _executor; + private Properties _props; + private int _size = 0; + private int _threads = 0; + private int _sent = 0; + private int _received = 0; + private int _rejected = 0; + private long _started = 0; + private long _finished = 0; + + private String _session; + private int _count; + private int _reject; + private int _rejectCount; + private int _maxRedelivery; + private boolean _messageIds; + private boolean _listener; + + public PerformanceTest(File propertyFile) + { + try + { + InputStream input = new FileInputStream(propertyFile); + _props = new Properties(); + _props.load(input); + } + catch (IOException ioe) + { + throw new RuntimeException("file error with " + propertyFile.getName()); + } + } + + public PerformanceTest(Properties props) + { + _props = props; + } + + public void test() throws Exception + { + Create create = new Create(_props); + create.connect(); + create.call(); + create.shutdown(); + + _executor = Executors.newCachedThreadPool(); + _threads = Integer.parseInt(_props.getProperty(THREADS)); + _size = Integer.parseInt(_props.getProperty(SIZE)); + _count = Integer.parseInt(_props.getProperty(COUNT)); + _reject = Integer.parseInt(_props.getProperty(REJECT)); + _rejectCount = Integer.parseInt(_props.getProperty(REJECT_COUNT)); + _maxRedelivery = Integer.parseInt(_props.getProperty(MAX_REDELIVERY)); + _session = _props.getProperty(SESSION); + _listener = Boolean.parseBoolean(_props.getProperty(LISTENER)); + _messageIds = Boolean.parseBoolean(_props.getProperty(MESSAGE_IDS)); + _latch = new CountDownLatch(1); + _started = System.nanoTime(); + + Sender sender = new Sender(_props); + sender.connect(); + Future<Integer> send = _executor.submit(sender); + + Receiver.reset(); + List<Future<Integer>> receives = new ArrayList<Future<Integer>>(); + List<Receiver> receivers = new ArrayList<Receiver>(); + for (int i = 0; i < _threads; i++) + { + Receiver receiver = new Receiver(_props); + receiver.connect(); + receivers.add(receiver); + receives.add(_executor.submit(receiver)); + } + + try + { + _latch.await(); + _finished = System.nanoTime(); + _sent = send.get(); + for (Future<Integer> receive : receives) + { + _received += receive.get(); + } + + Check check = new Check(_props); + check.connect(); + _rejected = check.call(); + check.shutdown(); + } + catch (Exception e) + { + throw new RuntimeException("error running tests", e); + } + finally + { + sender.shutdown(); + for (Receiver receiver : receivers) + { + receiver.shutdown(); + } + _executor.shutdownNow(); + } + } + + public void check(PrintStream out) + { + StringBuilder error = new StringBuilder(); + if (_sent != _count) + { + error.append("sent ").append(_sent).append(" not ").append(_count).append('\n'); + } + boolean sessionOk = ((_session.equalsIgnoreCase(CLIENT_ACKNOWLEDGE)) || (_session.equalsIgnoreCase(SESSION_TRANSACTED)) || + ((_session.equalsIgnoreCase(AUTO_ACKNOWLEDGE) || _session.equalsIgnoreCase(DUPS_OK_ACKNOWLEDGE)) && _listener)); + int rejected = (!sessionOk || !_messageIds || _maxRedelivery == 0 || _rejectCount < _maxRedelivery) ? 0 : _count / _reject; + if (_rejected != rejected) + { + error.append("rejected ").append(_rejected).append(" not ").append(rejected).append('\n'); + } + int received = (_count - rejected) + (sessionOk ? ((_count / _reject) * _rejectCount) : 0); + if (_received != received) + { + error.append("received ").append(_received).append(" not ").append(received).append('\n'); + } + if (error.length() > 0) + { + out.print(error.toString()); + } + else + { + out.println(toString()); + } + } + + public static String getHeader() + { + return "sent,received,rejected,duration"; + } + + public String toString() + { + String results = String.format("%d,%d,%d,%f", _sent, _received, _rejected, getDuration()); + return results; + } + + public double getSent() + { + return (double) _sent; + } + + public double getReceived() + { + return (double) _received; + } + + public double getDuration() + { + return ((double) _finished - (double) _started) / 1000000000.0d; + } + + public double getRejected() + { + return (double) _rejected; + } + + public double getThroughputIn() + { + return getSent() / getDuration(); + } + + public double getThroughputOut() + { + return getReceived() / getDuration(); + } + + public double getBandwidthIn() + { + return (getSent() * (double) _size) / getDuration(); + } + + public double getBandwidthOut() + { + return (getReceived() * (double) _size) / getDuration(); + } + + public double getLatency() + { + return getDuration() / getReceived(); + } + + public static void countDown() + { + _latch.countDown(); + } + + public static void main(String[] argv) throws Exception + { + if (argv.length != 1) + { + throw new IllegalArgumentException("must pass name of propert file as argument"); + } + + File propertyFile = new File(argv[0]); + if (!propertyFile.exists() || !propertyFile.canRead()) + { + throw new RuntimeException("property file '" + propertyFile.getName() + "' must exist and be readable"); + } + PerformanceTest client = new PerformanceTest(propertyFile); + client.test(); + client.check(System.out); + } +} + diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/Statistics.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/Statistics.java new file mode 100644 index 0000000000..4804aaaa42 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/Statistics.java @@ -0,0 +1,106 @@ +package org.apache.qpid.perftests.dlq.test; + +import java.util.Arrays; +import java.util.List; + +public class Statistics +{ + private String _data; + private double _mean; + private double _standardDeviation; + private double _tValue; + private double _min = Double.MAX_VALUE; + private double _max = Double.MIN_VALUE; + + private List<Double> tDistribution95 = Arrays.asList( + 12.70620473d, 4.30265273d, 3.182446305d, 2.776445105d, 2.570581835d, + 2.446911846d, 2.364624251d, 2.306004133d, 2.262157158d, 2.228138842d, + 2.200985159d, 2.178812827d, 2.160368652d, 2.144786681d, 2.131449536d, + 2.119905285d, 2.109815559d, 2.100922037d, 2.09302405d, 2.085963441d, + 2.079613837d, 2.073873058d, 2.068657599d, 2.063898547d, 2.059538536d + ); + + public Statistics(List<Double> samples, String data) + { + _data = data; + + double n = (double) samples.size(); + double total = 0.0d; + for (Double s : samples) + { + total += s; + _min = Math.min(_min, s); + _max = Math.max(_max, s); + } + _mean = total / n; + double deviationSquared = 0.0d; + for (Double s : samples) + { + double deviation = s - _mean; + deviationSquared += (deviation * deviation); + } + _standardDeviation = Math.sqrt(deviationSquared / (n - 1)); + _tValue = tDistribution95.get((int) (n - 2)) * (_standardDeviation / Math.sqrt(n)); + } + + public String getData() + { + return _data; + } + + public double getMean() + { + return _mean; + } + + public double getMin() + { + return _min; + } + + public double getMax() + { + return _max; + } + + public double getStandardDeviation() + { + return _standardDeviation; + } + + public double getIntervalMin() + { + return _mean - _tValue; + } + + public double getIntervalMax() + { + return _mean + _tValue; + } + + public double getInterval() + { + return 2.0d + _tValue; + } + + public double getRange() + { + return getMax() - getMin(); + } + + public static String getHeader() + { + return "data,mean,min,max,range,stdev,min95,max95,int95"; + } + + public String toString() + { + String results = String.format("%s,%f,%f,%f,%f,%f,%f,%f,%f", + _data, + getMean(), + getMin(), getMax(), getRange(), + getStandardDeviation(), + getIntervalMin(), getIntervalMax(), getInterval()); + return results; + } +} |