summaryrefslogtreecommitdiff
path: root/java/perftests/src/main/java/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'java/perftests/src/main/java/org/apache')
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/ArgumentParser.java6
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/ConfigFileHelper.java11
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/ControllerRunner.java84
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/ResultsFileWriter.java112
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java17
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantExecutor.java10
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java56
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/controller/ClientRegistry.java36
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/controller/ResultsForAllTests.java21
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConsumerConfig.java6
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ParticipantConfig.java4
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ProducerConfig.java2
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/db/ResultsDbWriter.java467
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java64
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java4
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/jms/NoOpQueueCreator.java5
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java101
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java5
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/message/ConsumerParticipantResult.java4
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateConsumerCommand.java11
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateParticpantCommand.java11
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantAttribute.java42
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantResult.java54
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/message/ProducerParticipantResult.java4
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ITestResult.java2
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ParticipantResultAggregator.java11
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/TestResultAggregator.java24
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/disttest/results/formatting/CSVFormatter.java (renamed from java/perftests/src/main/java/org/apache/qpid/disttest/results/formatting/CSVFormater.java)6
28 files changed, 1017 insertions, 163 deletions
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/ArgumentParser.java b/java/perftests/src/main/java/org/apache/qpid/disttest/ArgumentParser.java
index 8c1f8675e3..e962bfe799 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/ArgumentParser.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/ArgumentParser.java
@@ -34,10 +34,14 @@ public class ArgumentParser
throw new IllegalArgumentException("arguments must have format <name>=<value>: " + arg);
}
- if(initialValues.put(splitArg[0], splitArg[1]) == null)
+
+ String argumentKey = splitArg[0];
+ String argumentValue = splitArg[1];
+ if(!initialValues.containsKey(argumentKey))
{
throw new IllegalArgumentException("not a valid configuration property: " + arg);
}
+ initialValues.put(argumentKey, argumentValue);
}
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/ConfigFileHelper.java b/java/perftests/src/main/java/org/apache/qpid/disttest/ConfigFileHelper.java
index fb4c1b700b..ee374e180d 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/ConfigFileHelper.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/ConfigFileHelper.java
@@ -60,15 +60,4 @@ public class ConfigFileHelper
return testConfigFile;
}
-
- /**
- * generateOutputCsvNameFrom("/config/testConfigFile.js", "/output") returns /output/testConfigFile.csv
- */
- public String generateOutputCsvNameFrom(String testConfigFile, String outputDir)
- {
- final String filenameOnlyWithExtension = new File(testConfigFile).getName();
- final String cvsFile = filenameOnlyWithExtension.replaceFirst(".?\\w*$", ".csv");
-
- return new File(outputDir, cvsFile).getAbsolutePath();
- }
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/ControllerRunner.java b/java/perftests/src/main/java/org/apache/qpid/disttest/ControllerRunner.java
index aea0ea301a..449130a328 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/ControllerRunner.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/ControllerRunner.java
@@ -19,9 +19,9 @@
*/
package org.apache.qpid.disttest;
+import java.io.File;
import java.io.FileNotFoundException;
-import java.io.FileWriter;
-import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import javax.naming.Context;
@@ -30,9 +30,9 @@ import org.apache.qpid.disttest.controller.Controller;
import org.apache.qpid.disttest.controller.ResultsForAllTests;
import org.apache.qpid.disttest.controller.config.Config;
import org.apache.qpid.disttest.controller.config.ConfigReader;
+import org.apache.qpid.disttest.db.ResultsDbWriter;
import org.apache.qpid.disttest.jms.ControllerJmsDelegate;
import org.apache.qpid.disttest.results.aggregation.Aggregator;
-import org.apache.qpid.disttest.results.formatting.CSVFormater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,20 +43,29 @@ public class ControllerRunner extends AbstractRunner
public static final String TEST_CONFIG_PROP = "test-config";
public static final String DISTRIBUTED_PROP = "distributed";
public static final String OUTPUT_DIR_PROP = "outputdir";
+ public static final String WRITE_TO_DB = "writeToDb";
+ public static final String RUN_ID = "runId";
private static final String TEST_CONFIG_DEFAULT = "perftests-config.json";
private static final String DISTRIBUTED_DEFAULT = "false";
private static final String OUTPUT_DIR_DEFAULT = ".";
+ public static final String WRITE_TO_DB_DEFAULT = "false";
private final Aggregator _aggregator = new Aggregator();
private final ConfigFileHelper _configFileHelper = new ConfigFileHelper();
+ private ResultsFileWriter _resultsFileWriter;
+
+ private ResultsDbWriter _resultsDbWriter;
+
public ControllerRunner()
{
getCliOptions().put(TEST_CONFIG_PROP, TEST_CONFIG_DEFAULT);
getCliOptions().put(DISTRIBUTED_PROP, DISTRIBUTED_DEFAULT);
getCliOptions().put(OUTPUT_DIR_PROP, OUTPUT_DIR_DEFAULT);
+ getCliOptions().put(WRITE_TO_DB, WRITE_TO_DB_DEFAULT);
+ getCliOptions().put(RUN_ID, null);
}
public static void main(String[] args) throws Exception
@@ -69,6 +78,8 @@ public class ControllerRunner extends AbstractRunner
public void runController() throws Exception
{
Context context = getContext();
+ setUpResultFilesWriter();
+ setUpResultsDbWriter();
ControllerJmsDelegate jmsDelegate = new ControllerJmsDelegate(context);
@@ -82,6 +93,24 @@ public class ControllerRunner extends AbstractRunner
}
}
+ private void setUpResultsDbWriter()
+ {
+ String writeToDbStr = getCliOptions().get(WRITE_TO_DB);
+ if(Boolean.valueOf(writeToDbStr))
+ {
+ String runId = getCliOptions().get(RUN_ID);
+ _resultsDbWriter = new ResultsDbWriter(getContext(), runId);
+ _resultsDbWriter.createResultsTableIfNecessary();
+ }
+ }
+
+ void setUpResultFilesWriter()
+ {
+ String outputDirString = getCliOptions().get(ControllerRunner.OUTPUT_DIR_PROP);
+ File outputDir = new File(outputDirString);
+ _resultsFileWriter = new ResultsFileWriter(outputDir);
+ }
+
private void runTests(ControllerJmsDelegate jmsDelegate)
{
Controller controller = new Controller(jmsDelegate, DistributedTestConstants.REGISTRATION_TIMEOUT, DistributedTestConstants.COMMAND_RESPONSE_TIMEOUT);
@@ -92,6 +121,8 @@ public class ControllerRunner extends AbstractRunner
try
{
+ List<ResultsForAllTests> results = new ArrayList<ResultsForAllTests>();
+
for (String testConfigFile : testConfigFiles)
{
final Config testConfig = buildTestConfigFrom(testConfigFile);
@@ -100,8 +131,11 @@ public class ControllerRunner extends AbstractRunner
controller.awaitClientRegistrations();
LOGGER.info("Running test : " + testConfigFile);
- runTest(controller, testConfigFile);
+ ResultsForAllTests testResult = runTest(controller, testConfigFile);
+ results.add(testResult);
}
+
+ _resultsFileWriter.writeResultsSummary(results);
}
catch(Exception e)
{
@@ -113,7 +147,7 @@ public class ControllerRunner extends AbstractRunner
}
}
- private void runTest(Controller controller, String testConfigFile)
+ private ResultsForAllTests runTest(Controller controller, String testConfigFile)
{
final Config testConfig = buildTestConfigFrom(testConfigFile);
controller.setConfig(testConfig);
@@ -121,9 +155,13 @@ public class ControllerRunner extends AbstractRunner
ResultsForAllTests rawResultsForAllTests = controller.runAllTests();
ResultsForAllTests resultsForAllTests = _aggregator.aggregateResults(rawResultsForAllTests);
- String outputDir = getCliOptions().get(ControllerRunner.OUTPUT_DIR_PROP);
- final String outputFile = _configFileHelper.generateOutputCsvNameFrom(testConfigFile, outputDir);
- writeResultsToFile(resultsForAllTests, outputFile);
+ _resultsFileWriter.writeResultsToFile(resultsForAllTests, testConfigFile);
+ if(_resultsDbWriter != null)
+ {
+ _resultsDbWriter.writeResults(resultsForAllTests);
+ }
+
+ return resultsForAllTests;
}
private void createClientsIfNotDistributed(final List<String> testConfigFiles)
@@ -148,36 +186,6 @@ public class ControllerRunner extends AbstractRunner
}
}
- private void writeResultsToFile(ResultsForAllTests resultsForAllTests, String outputFile)
- {
- FileWriter writer = null;
- try
- {
- final String outputCsv = new CSVFormater().format(resultsForAllTests);
- writer = new FileWriter(outputFile);
- writer.write(outputCsv);
- LOGGER.info("Wrote " + resultsForAllTests.getTestResults().size() + " test result(s) to output file " + outputFile);
- }
- catch (IOException e)
- {
- throw new DistributedTestException("Unable to write output file " + outputFile, e);
- }
- finally
- {
- if (writer != null)
- {
- try
- {
- writer.close();
- }
- catch (IOException e)
- {
- LOGGER.error("Failed to close stream for file " + outputFile, e);
- }
- }
- }
- }
-
private Config buildTestConfigFrom(String testConfigFile)
{
ConfigReader configReader = new ConfigReader();
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/ResultsFileWriter.java b/java/perftests/src/main/java/org/apache/qpid/disttest/ResultsFileWriter.java
new file mode 100644
index 0000000000..81b717403d
--- /dev/null
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/ResultsFileWriter.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.qpid.disttest.controller.ResultsForAllTests;
+import org.apache.qpid.disttest.results.aggregation.TestResultAggregator;
+import org.apache.qpid.disttest.results.formatting.CSVFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ResultsFileWriter
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(ResultsFileWriter.class);
+
+ static final String TEST_SUMMARY_FILE_NAME = "test-summary.csv";
+
+ private final File _outputDir;
+
+ private CSVFormatter _csvFormater = new CSVFormatter();
+
+ private TestResultAggregator _testResultAggregator = new TestResultAggregator();
+
+ public ResultsFileWriter(File outputDir)
+ {
+ _outputDir = outputDir;
+ }
+
+ public void writeResultsToFile(ResultsForAllTests resultsForAllTests, String testConfigFile)
+ {
+ final String outputFile = generateOutputCsvNameFrom(testConfigFile);
+ writeResultsToOutputFile(resultsForAllTests, outputFile);
+ }
+
+ public void writeResultsSummary(List<ResultsForAllTests> allResultsList)
+ {
+ ResultsForAllTests combinedResults = _testResultAggregator.aggregateTestResults(allResultsList);
+ writeResultsToOutputFile(combinedResults, new File(_outputDir, TEST_SUMMARY_FILE_NAME).getAbsolutePath());
+ }
+
+ /**
+ * generateOutputCsvNameFrom("/config/testConfigFile.js", "/output") returns /output/testConfigFile.csv
+ */
+ private String generateOutputCsvNameFrom(String testConfigFile)
+ {
+ final String filenameOnlyWithExtension = new File(testConfigFile).getName();
+ final String cvsFile = filenameOnlyWithExtension.replaceFirst(".?\\w*$", ".csv");
+
+ return new File(_outputDir, cvsFile).getAbsolutePath();
+ }
+
+ private void writeResultsToOutputFile(ResultsForAllTests resultsForAllTests, String outputFile)
+ {
+ FileWriter writer = null;
+ try
+ {
+ final String outputCsv = _csvFormater.format(resultsForAllTests);
+ writer = new FileWriter(outputFile);
+ writer.write(outputCsv);
+ LOGGER.info("Wrote " + resultsForAllTests.getTestResults().size() + " test result(s) to output file " + outputFile);
+ }
+ catch (IOException e)
+ {
+ throw new DistributedTestException("Unable to write output file " + outputFile, e);
+ }
+ finally
+ {
+ if (writer != null)
+ {
+ try
+ {
+ writer.close();
+ }
+ catch (IOException e)
+ {
+ LOGGER.error("Failed to close stream for file " + outputFile, e);
+ }
+ }
+ }
+ }
+
+ void setCsvFormater(CSVFormatter csvFormater)
+ {
+ _csvFormater = csvFormater;
+ }
+
+ void setTestResultAggregator(TestResultAggregator testResultAggregator)
+ {
+ _testResultAggregator = testResultAggregator;
+ }
+
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java b/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java
index f9d50e8e64..d3a5e30191 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java
@@ -35,7 +35,6 @@ import javax.jms.MessageListener;
import org.apache.qpid.disttest.DistributedTestException;
import org.apache.qpid.disttest.jms.ClientJmsDelegate;
-import org.apache.qpid.disttest.message.ConsumerParticipantResult;
import org.apache.qpid.disttest.message.CreateConsumerCommand;
import org.apache.qpid.disttest.message.ParticipantResult;
import org.slf4j.Logger;
@@ -103,16 +102,22 @@ public class ConsumerParticipant implements Participant
}
Date end = new Date();
- int numberOfMessagesSent = _totalNumberOfMessagesReceived.get();
+ int numberOfMessagesReceived = _totalNumberOfMessagesReceived.get();
long totalPayloadSize = _totalPayloadSizeOfAllMessagesReceived.get();
int payloadSize = getPayloadSizeForResultIfConstantOrZeroOtherwise(_allConsumedPayloadSizes);
- ConsumerParticipantResult result = _resultFactory.createForConsumer(
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Consumer {} finished consuming. Number of messages consumed: {}",
+ getName(), numberOfMessagesReceived);
+ }
+
+ ParticipantResult result = _resultFactory.createForConsumer(
getName(),
registeredClientName,
_command,
acknowledgeMode,
- numberOfMessagesSent,
+ numberOfMessagesReceived,
payloadSize,
totalPayloadSize,
start, end, _messageLatencies);
@@ -174,7 +179,7 @@ public class ConsumerParticipant implements Participant
{
LOGGER.trace("Committing: batch size " + _command.getBatchSize() );
}
- _jmsDelegate.commitOrAcknowledgeMessage(message, _command.getSessionName());
+ _jmsDelegate.commitOrAcknowledgeMessageIfNecessary(_command.getSessionName(), message);
}
}
@@ -199,7 +204,7 @@ public class ConsumerParticipant implements Participant
}
// commit/acknowledge remaining messages if necessary
- _jmsDelegate.commitOrAcknowledgeMessage(message, _command.getSessionName());
+ _jmsDelegate.commitOrAcknowledgeMessageIfNecessary(_command.getSessionName(), message);
}
return false;
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantExecutor.java b/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantExecutor.java
index bb9ce26f7e..10f62708a4 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantExecutor.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantExecutor.java
@@ -108,8 +108,16 @@ public class ParticipantExecutor
}
finally
{
+ try
+ {
+ _participant.releaseResources();
+ }
+ catch(Exception e)
+ {
+ LOGGER.error("Participant " + _participant + " unable to release resources", e);
+ }
+
_client.sendResults(result);
- _participant.releaseResources();
}
}
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java b/java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java
index 63cbe98b5c..a9da837dea 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java
@@ -58,17 +58,25 @@ public class ProducerParticipant implements Participant
@Override
public ParticipantResult doIt(String registeredClientName) throws Exception
{
- if (_command.getMaximumDuration() == 0 && _command.getNumberOfMessages() == 0)
+ long numberOfMessages = _command.getNumberOfMessages();
+ long maximumDuration = _command.getMaximumDuration();
+
+ if (maximumDuration == 0 && numberOfMessages == 0)
{
throw new DistributedTestException("number of messages and duration cannot both be zero");
}
- int acknowledgeMode = _jmsDelegate.getAcknowledgeMode(_command.getSessionName());
+ long duration = maximumDuration - _command.getStartDelay();
+ if (maximumDuration > 0 && duration <= 0)
+ {
+ throw new DistributedTestException("Start delay must be less than maximum test duration");
+ }
+ final long requiredDuration = duration > 0 ? duration : 0;
doSleepForStartDelay();
- final long requiredDuration = _command.getMaximumDuration() - _command.getStartDelay();
-
+ final int batchSize = _command.getBatchSize();
+ final int acknowledgeMode = _jmsDelegate.getAcknowledgeMode(_command.getSessionName());
final long startTime = System.currentTimeMillis();
Message lastPublishedMessage = null;
@@ -78,10 +86,20 @@ public class ProducerParticipant implements Participant
_limiter = ExecutorWithLimitsFactory.createExecutorWithLimit(startTime, requiredDuration);
- LOGGER.info("Producer {} about to send messages", getName());
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Producer {} about to send messages. Duration limit: {} ms, Message limit: {}",
+ new Object[]{getName(), requiredDuration, numberOfMessages});
+ }
while (true)
{
+ if (numberOfMessages > 0 && numberOfMessagesSent >= numberOfMessages
+ || requiredDuration > 0 && System.currentTimeMillis() - startTime >= requiredDuration)
+ {
+ break;
+ }
+
try
{
lastPublishedMessage = _limiter.execute(new Callable<Message>()
@@ -110,35 +128,35 @@ public class ProducerParticipant implements Participant
LOGGER.trace("message " + numberOfMessagesSent + " sent by " + this);
}
- final boolean batchLimitReached = _command.getBatchSize() <= 0
- || numberOfMessagesSent % _command.getBatchSize() == 0;
+ final boolean batchLimitReached = batchSize <= 0
+ || numberOfMessagesSent % batchSize == 0;
if (batchLimitReached)
{
- if (LOGGER.isTraceEnabled() && _command.getBatchSize() > 0)
+ if (LOGGER.isTraceEnabled() && batchSize > 0)
{
- LOGGER.trace("Committing: batch size " + _command.getBatchSize() );
+ LOGGER.trace("Committing: batch size " + batchSize );
}
- _jmsDelegate.commitOrAcknowledgeMessage(lastPublishedMessage, _command.getSessionName());
+ _jmsDelegate.commitIfNecessary(_command.getSessionName());
doSleepForInterval();
}
-
- if (_command.getNumberOfMessages() > 0 && numberOfMessagesSent >= _command.getNumberOfMessages()
- || requiredDuration > 0 && System.currentTimeMillis() - startTime >= requiredDuration)
- {
- break;
- }
}
// commit the remaining batch messages
- if (_command.getBatchSize() > 0 && numberOfMessagesSent % _command.getBatchSize() != 0)
+ if (batchSize > 0 && numberOfMessagesSent % batchSize != 0)
{
if (LOGGER.isTraceEnabled())
{
- LOGGER.trace("Committing: batch size " + _command.getBatchSize() );
+ LOGGER.trace("Committing: batch size " + batchSize );
}
- _jmsDelegate.commitOrAcknowledgeMessage(lastPublishedMessage, _command.getSessionName());
+ _jmsDelegate.commitIfNecessary(_command.getSessionName());
+ }
+
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Producer {} finished publishing. Number of messages published: {}",
+ getName(), numberOfMessagesSent);
}
Date start = new Date(startTime);
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/controller/ClientRegistry.java b/java/perftests/src/main/java/org/apache/qpid/disttest/controller/ClientRegistry.java
index eaccb54f0e..5a726c50b4 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/controller/ClientRegistry.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/controller/ClientRegistry.java
@@ -57,34 +57,54 @@ public class ClientRegistry
return Collections.unmodifiableSet(_registeredClientNames);
}
- public int awaitClients(int numberOfClientsToAwait, long timeout)
+ /**
+ * @return the number of clients that are still absent.
+ */
+ public int awaitClients(final int numberOfClientsToAwait, final long idleTimeout)
{
- final long endTime = System.currentTimeMillis() + timeout;
+ long deadlineForNextRegistration = deadline(idleTimeout);
- int numberOfClientsAbsent = numberOfClientsToAwait - _registeredClientNames.size();
- long remainingTimeout = endTime - System.currentTimeMillis();
+ int numberOfClientsAbsent = numberAbsent(numberOfClientsToAwait);
- while(numberOfClientsAbsent > 0 && remainingTimeout > 0)
+ while(numberOfClientsAbsent > 0 && System.currentTimeMillis() < deadlineForNextRegistration)
{
synchronized (_lock)
{
try
{
- _lock.wait(remainingTimeout);
+ _lock.wait(idleTimeout);
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
+ return numberOfClientsAbsent;
}
}
- numberOfClientsAbsent = numberOfClientsToAwait - _registeredClientNames.size();
- remainingTimeout = endTime - System.currentTimeMillis();
+ int newNumberAbsent = numberAbsent(numberOfClientsToAwait);
+ if(newNumberAbsent < numberOfClientsAbsent)
+ {
+ // a registration was received since the last loop, so reset the timeout
+ deadlineForNextRegistration = deadline(idleTimeout);
+ }
+
+ numberOfClientsAbsent = newNumberAbsent;
}
return numberOfClientsAbsent < 0 ? 0 : numberOfClientsAbsent;
}
+
+ private long deadline(final long idleTimeout)
+ {
+ return System.currentTimeMillis() + idleTimeout;
+ }
+
+ private int numberAbsent(int numberOfClientsToAwait)
+ {
+ return numberOfClientsToAwait - _registeredClientNames.size();
+ }
+
private void notifyAllWaiters()
{
synchronized (_lock)
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/controller/ResultsForAllTests.java b/java/perftests/src/main/java/org/apache/qpid/disttest/controller/ResultsForAllTests.java
index 6c5ff3450c..d4474e2c12 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/controller/ResultsForAllTests.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/controller/ResultsForAllTests.java
@@ -21,7 +21,9 @@ package org.apache.qpid.disttest.controller;
import java.util.ArrayList;
import java.util.List;
+import org.apache.qpid.disttest.message.ParticipantResult;
import org.apache.qpid.disttest.results.aggregation.ITestResult;
+import org.apache.qpid.disttest.results.aggregation.TestResultAggregator;
public class ResultsForAllTests
{
@@ -46,4 +48,23 @@ public class ResultsForAllTests
{
return _hasErrors;
}
+
+ public ResultsForAllTests getAllParticipantsResult()
+ {
+ ResultsForAllTests summaryResultsForAllTests = new ResultsForAllTests();
+
+ for (ITestResult testResult : _results)
+ {
+ for(ParticipantResult participantResult : testResult.getParticipantResults())
+ {
+ if(TestResultAggregator.ALL_CONSUMER_PARTICIPANTS_NAME.equals(participantResult.getParticipantName()))
+ {
+ TestResult summaryTestResult = new TestResult(testResult.getName());
+ summaryTestResult.addParticipantResult(participantResult);
+ summaryResultsForAllTests.add(summaryTestResult);
+ }
+ }
+ }
+ return summaryResultsForAllTests;
+ }
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConsumerConfig.java b/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConsumerConfig.java
index 110de8a4ea..dcccccdd5f 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConsumerConfig.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConsumerConfig.java
@@ -24,7 +24,6 @@ import org.apache.qpid.disttest.message.CreateConsumerCommand;
public class ConsumerConfig extends ParticipantConfig
{
- private boolean _isTopic;
private boolean _isDurableSubscription;
private boolean _isBrowsingSubscription;
private String _selector;
@@ -35,7 +34,6 @@ public class ConsumerConfig extends ParticipantConfig
// For Gson
public ConsumerConfig()
{
- _isTopic = false;
_isDurableSubscription = false;
_isBrowsingSubscription = false;
_selector = null;
@@ -56,9 +54,8 @@ public class ConsumerConfig extends ParticipantConfig
boolean noLocal,
boolean synchronous)
{
- super(consumerName, destinationName, numberOfMessages, batchSize, maximumDuration);
+ super(consumerName, destinationName, isTopic, numberOfMessages, batchSize, maximumDuration);
- _isTopic = isTopic;
_isDurableSubscription = isDurableSubscription;
_isBrowsingSubscription = isBrowsingSubscription;
_selector = selector;
@@ -73,7 +70,6 @@ public class ConsumerConfig extends ParticipantConfig
setParticipantProperties(createConsumerCommand);
createConsumerCommand.setSessionName(sessionName);
- createConsumerCommand.setTopic(_isTopic);
createConsumerCommand.setDurableSubscription(_isDurableSubscription);
createConsumerCommand.setBrowsingSubscription(_isBrowsingSubscription);
createConsumerCommand.setSelector(_selector);
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ParticipantConfig.java b/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ParticipantConfig.java
index 16f7b0d18d..99ae4b7426 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ParticipantConfig.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ParticipantConfig.java
@@ -33,6 +33,7 @@ public abstract class ParticipantConfig
private boolean _alreadyLoggedAboutOverriddenDuration;
private String _destinationName;
+ private boolean _isTopic;
private long _numberOfMessages;
private String _name;
private int _batchSize;
@@ -51,12 +52,14 @@ public abstract class ParticipantConfig
public ParticipantConfig(
String name,
String destinationName,
+ boolean isTopic,
long numberOfMessages,
int batchSize,
long maximumDuration)
{
_name = name;
_destinationName = destinationName;
+ _isTopic = isTopic;
_numberOfMessages = numberOfMessages;
_batchSize = batchSize;
_maximumDuration = maximumDuration;
@@ -66,6 +69,7 @@ public abstract class ParticipantConfig
{
createParticipantCommand.setParticipantName(_name);
createParticipantCommand.setDestinationName(_destinationName);
+ createParticipantCommand.setTopic(_isTopic);
createParticipantCommand.setNumberOfMessages(_numberOfMessages);
createParticipantCommand.setBatchSize(_batchSize);
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ProducerConfig.java b/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ProducerConfig.java
index f2369ed671..88c188d3ac 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ProducerConfig.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ProducerConfig.java
@@ -59,7 +59,7 @@ public class ProducerConfig extends ParticipantConfig
long startDelay,
String messageProviderName)
{
- super(producerName, destinationName, numberOfMessages, batchSize, maximumDuration);
+ super(producerName, destinationName, false, numberOfMessages, batchSize, maximumDuration);
_deliveryMode = deliveryMode;
_messageSize = messageSize;
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/db/ResultsDbWriter.java b/java/perftests/src/main/java/org/apache/qpid/disttest/db/ResultsDbWriter.java
new file mode 100644
index 0000000000..fdea03ae5e
--- /dev/null
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/db/ResultsDbWriter.java
@@ -0,0 +1,467 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.db;
+
+import static org.apache.qpid.disttest.message.ParticipantAttribute.ACKNOWLEDGE_MODE;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.AVERAGE_LATENCY;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.BATCH_SIZE;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.CONFIGURED_CLIENT_NAME;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.DELIVERY_MODE;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.ERROR_MESSAGE;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_BROWSING_SUBSCRIPTION;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_DURABLE_SUBSCRIPTION;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_NO_LOCAL;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_SELECTOR;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_SYNCHRONOUS_CONSUMER;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_TOPIC;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.ITERATION_NUMBER;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.LATENCY_STANDARD_DEVIATION;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.MAXIMUM_DURATION;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.MAX_LATENCY;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.MIN_LATENCY;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.NUMBER_OF_MESSAGES_PROCESSED;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.PARTICIPANT_NAME;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.PAYLOAD_SIZE;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.PRIORITY;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.PRODUCER_INTERVAL;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.PRODUCER_START_DELAY;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.TEST_NAME;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.THROUGHPUT;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.TIME_TAKEN;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.TIME_TO_LIVE;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.TOTAL_NUMBER_OF_CONSUMERS;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.TOTAL_NUMBER_OF_PRODUCERS;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.TOTAL_PAYLOAD_PROCESSED;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.Hashtable;
+import java.util.TimeZone;
+
+import javax.naming.Context;
+import javax.naming.NamingException;
+
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+import org.apache.log4j.Logger;
+import org.apache.qpid.disttest.controller.ResultsForAllTests;
+import org.apache.qpid.disttest.message.ParticipantResult;
+import org.apache.qpid.disttest.results.aggregation.ITestResult;
+
+/**
+ * Intended call sequence:
+ * <ul>
+ * <li>{@link #ResultsDbWriter(Context, String)}</li>
+ * <li>{@link #createResultsTableIfNecessary()}</li>
+ * <li>{@link #writeResults(ResultsForAllTests)} (usually multiple times)</li>
+ * </ul>
+ */
+public class ResultsDbWriter
+{
+ private static final Logger _logger = Logger.getLogger(ResultsDbWriter.class);
+
+ private static final String RESULTS_TABLE_NAME = "RESULTS";
+
+ /** column name */
+ static final String INSERTED_TIMESTAMP = "insertedTimestamp";
+ /** column name */
+ static final String RUN_ID = "runId";
+
+ private static final String TABLE_EXISTENCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?";
+
+ private static final String CREATE_RESULTS_TABLE = String.format(
+ "CREATE TABLE %1$s (" +
+ "%2$s varchar(200) not null" + // TEST_NAME
+ ", %3$s bigint not null" + // ITERATION_NUMBER
+ ", %4$s varchar(200) not null" + // PARTICIPANT_NAME
+ ", %5$s double not null" + // THROUGHPUT
+ ", %6$s double" + // AVERAGE_LATENCY
+ ", %7$s varchar(200)" + // CONFIGURED_CLIENT_NAME
+ ", %8$s bigint" + // NUMBER_OF_MESSAGES_PROCESSED
+ ", %9$s bigint" + // PAYLOAD_SIZE
+ ", %10$s bigint" + // PRIORITY
+ ", %11$s bigint" + // TIME_TO_LIVE
+ ", %12$s bigint" + // ACKNOWLEDGE_MODE
+ ", %13$s bigint" + // DELIVERY_MODE
+ ", %14$s bigint" + // BATCH_SIZE
+ ", %15$s bigint" + // MAXIMUM_DURATION
+ ", %16$s bigint" + // PRODUCER_START_DELAY
+ ", %17$s bigint" + // PRODUCER_INTERVAL
+ ", %18$s bigint" + // IS_TOPIC
+ ", %19$s bigint" + // IS_DURABLE_SUBSCRIPTION
+ ", %20$s bigint" + // IS_BROWSING_SUBSCRIPTION
+ ", %21$s bigint" + // IS_SELECTOR
+ ", %22$s bigint" + // IS_NO_LOCAL
+ ", %23$s bigint" + // IS_SYNCHRONOUS_CONSUMER
+ ", %24$s bigint" + // TOTAL_NUMBER_OF_CONSUMERS
+ ", %25$s bigint" + // TOTAL_NUMBER_OF_PRODUCERS
+ ", %26$s bigint" + // TOTAL_PAYLOAD_PROCESSED
+ ", %27$s bigint" + // TIME_TAKEN
+ ", %28$s varchar(2000)" + // ERROR_MESSAGE
+ ", %29$s bigint" + // MIN_LATENCY
+ ", %30$s bigint" + // MAX_LATENCY
+ ", %31$s double" + // LATENCY_STANDARD_DEVIATION
+ ", %32$s varchar(200) not null" +
+ ", %33$s timestamp not null" +
+ ")",
+ RESULTS_TABLE_NAME,
+ TEST_NAME.getDisplayName(),
+ ITERATION_NUMBER.getDisplayName(),
+ PARTICIPANT_NAME.getDisplayName(),
+ THROUGHPUT.getDisplayName(),
+ AVERAGE_LATENCY.getDisplayName(),
+ CONFIGURED_CLIENT_NAME.getDisplayName(),
+ NUMBER_OF_MESSAGES_PROCESSED.getDisplayName(),
+ PAYLOAD_SIZE.getDisplayName(),
+ PRIORITY.getDisplayName(),
+ TIME_TO_LIVE.getDisplayName(),
+ ACKNOWLEDGE_MODE.getDisplayName(),
+ DELIVERY_MODE.getDisplayName(),
+ BATCH_SIZE.getDisplayName(),
+ MAXIMUM_DURATION.getDisplayName(),
+ PRODUCER_START_DELAY.getDisplayName(),
+ PRODUCER_INTERVAL.getDisplayName(),
+ IS_TOPIC.getDisplayName(),
+ IS_DURABLE_SUBSCRIPTION.getDisplayName(),
+ IS_BROWSING_SUBSCRIPTION.getDisplayName(),
+ IS_SELECTOR.getDisplayName(),
+ IS_NO_LOCAL.getDisplayName(),
+ IS_SYNCHRONOUS_CONSUMER.getDisplayName(),
+ TOTAL_NUMBER_OF_CONSUMERS.getDisplayName(),
+ TOTAL_NUMBER_OF_PRODUCERS.getDisplayName(),
+ TOTAL_PAYLOAD_PROCESSED.getDisplayName(),
+ TIME_TAKEN.getDisplayName(),
+ ERROR_MESSAGE.getDisplayName(),
+ MIN_LATENCY.getDisplayName(),
+ MAX_LATENCY.getDisplayName(),
+ LATENCY_STANDARD_DEVIATION.getDisplayName(),
+ RUN_ID,
+ INSERTED_TIMESTAMP
+ );
+
+ public static final String DRIVER_NAME = "jdbcDriverClass";
+ public static final String URL = "jdbcUrl";
+
+ private final String _url;
+ private final String _runId;
+
+ private final Clock _clock;
+
+ /**
+ * @param runId may be null, in which case a default value is chosen based on current GMT time
+ * @param context must contain environment entries {@value #DRIVER_NAME} and {@value #URL}.
+ */
+ public ResultsDbWriter(Context context, String runId)
+ {
+ this(context, runId, new Clock());
+ }
+
+ /** only call directly from tests */
+ ResultsDbWriter(Context context, String runId, Clock clock)
+ {
+ _clock = clock;
+ _runId = defaultIfNullRunId(runId);
+
+ _url = initialiseJdbc(context);
+ }
+
+ private String defaultIfNullRunId(String runId)
+ {
+ if(runId == null)
+ {
+ Date dateNow = new Date(_clock.currentTimeMillis());
+ Calendar calNow = Calendar.getInstance(TimeZone.getTimeZone("GMT+00:00"));
+ calNow.setTime(dateNow);
+ return String.format("run %1$tF %1$tT.%tL", calNow);
+ }
+ else
+ {
+ return runId;
+ }
+ }
+
+ public String getRunId()
+ {
+ return _runId;
+ }
+
+ /**
+ * Uses the context's environment to load the JDBC driver class and return the
+ * JDBC URL specified therein.
+ * @return the JDBC URL
+ */
+ private String initialiseJdbc(Context context)
+ {
+ Hashtable<?, ?> environment = null;
+ try
+ {
+ environment = context.getEnvironment();
+
+ String driverName = (String) environment.get(DRIVER_NAME);
+ if(driverName == null)
+ {
+ throw new IllegalArgumentException("JDBC driver name " + DRIVER_NAME
+ + " missing from context environment: " + environment);
+ }
+
+ Class.forName(driverName);
+
+ Object url = environment.get(URL);
+ if(url == null)
+ {
+ throw new IllegalArgumentException("JDBC URL " + URL + " missing from context environment: " + environment);
+ }
+ return (String) url;
+ }
+ catch (NamingException e)
+ {
+ throw constructorRethrow(e, environment);
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw constructorRethrow(e, environment);
+ }
+ }
+
+ private RuntimeException constructorRethrow(Exception e, Hashtable<?, ?> environment)
+ {
+ return new RuntimeException("Couldn't initialise ResultsDbWriter from context with environment" + environment, e);
+ }
+
+ public void createResultsTableIfNecessary()
+ {
+ try
+ {
+ Connection connection = null;
+ try
+ {
+ connection = DriverManager.getConnection(_url);
+ if(!tableExists(RESULTS_TABLE_NAME, connection))
+ {
+ Statement statement = connection.createStatement();
+ try
+ {
+ _logger.info("About to create results table using SQL: " + CREATE_RESULTS_TABLE);
+ statement.execute(CREATE_RESULTS_TABLE);
+ }
+ finally
+ {
+ statement.close();
+ }
+ }
+ }
+ finally
+ {
+ if(connection != null)
+ {
+ connection.close();
+ }
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new RuntimeException("Couldn't create results table", e);
+ }
+
+ }
+
+ private boolean tableExists(final String tableName, final Connection conn) throws SQLException
+ {
+ PreparedStatement stmt = conn.prepareStatement(TABLE_EXISTENCE_QUERY);
+ try
+ {
+ stmt.setString(1, tableName);
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ return rs.next();
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+
+ public void writeResults(ResultsForAllTests results)
+ {
+ try
+ {
+ writeResultsThrowingException(results);
+ }
+ catch (SQLException e)
+ {
+ throw new RuntimeException("Couldn't write results " + results, e);
+ }
+ _logger.info(this + " wrote " + results.getTestResults().size() + " results to database");
+ }
+
+ private void writeResultsThrowingException(ResultsForAllTests results) throws SQLException
+ {
+ Connection connection = null;
+ try
+ {
+ connection = DriverManager.getConnection(_url);
+
+ for (ITestResult testResult : results.getTestResults())
+ {
+ for (ParticipantResult participantResult : testResult.getParticipantResults())
+ {
+ writeParticipantResult(connection, participantResult);
+ }
+ }
+ }
+ finally
+ {
+ if(connection != null)
+ {
+ connection.close();
+ }
+ }
+ }
+
+ private void writeParticipantResult(Connection connection, ParticipantResult participantResult) throws SQLException
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("About to write to DB the following participant result: " + participantResult);
+ }
+
+ PreparedStatement statement = null;
+ try
+ {
+ String sqlTemplate = String.format(
+ "INSERT INTO %s (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) " +
+ "VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
+ RESULTS_TABLE_NAME,
+ TEST_NAME.getDisplayName(),
+ ITERATION_NUMBER.getDisplayName(),
+ PARTICIPANT_NAME.getDisplayName(),
+ THROUGHPUT.getDisplayName(),
+ AVERAGE_LATENCY.getDisplayName(),
+ CONFIGURED_CLIENT_NAME.getDisplayName(),
+ NUMBER_OF_MESSAGES_PROCESSED.getDisplayName(),
+ PAYLOAD_SIZE.getDisplayName(),
+ PRIORITY.getDisplayName(),
+ TIME_TO_LIVE.getDisplayName(),
+ ACKNOWLEDGE_MODE.getDisplayName(),
+ DELIVERY_MODE.getDisplayName(),
+ BATCH_SIZE.getDisplayName(),
+ MAXIMUM_DURATION.getDisplayName(),
+ PRODUCER_START_DELAY.getDisplayName(),
+ PRODUCER_INTERVAL.getDisplayName(),
+ IS_TOPIC.getDisplayName(),
+ IS_DURABLE_SUBSCRIPTION.getDisplayName(),
+ IS_BROWSING_SUBSCRIPTION.getDisplayName(),
+ IS_SELECTOR.getDisplayName(),
+ IS_NO_LOCAL.getDisplayName(),
+ IS_SYNCHRONOUS_CONSUMER.getDisplayName(),
+ TOTAL_NUMBER_OF_CONSUMERS.getDisplayName(),
+ TOTAL_NUMBER_OF_PRODUCERS.getDisplayName(),
+ TOTAL_PAYLOAD_PROCESSED.getDisplayName(),
+ TIME_TAKEN.getDisplayName(),
+ ERROR_MESSAGE.getDisplayName(),
+ MIN_LATENCY.getDisplayName(),
+ MAX_LATENCY.getDisplayName(),
+ LATENCY_STANDARD_DEVIATION.getDisplayName(),
+ RUN_ID,
+ INSERTED_TIMESTAMP
+ );
+ statement = connection.prepareStatement(sqlTemplate);
+
+ int columnIndex = 1;
+ statement.setString(columnIndex++, participantResult.getTestName());
+ statement.setInt(columnIndex++, participantResult.getIterationNumber());
+ statement.setString(columnIndex++, participantResult.getParticipantName());
+ statement.setDouble(columnIndex++, participantResult.getThroughput());
+ statement.setDouble(columnIndex++, participantResult.getAverageLatency());
+ statement.setString(columnIndex++, participantResult.getConfiguredClientName());
+ statement.setLong(columnIndex++, participantResult.getNumberOfMessagesProcessed());
+ statement.setLong(columnIndex++, participantResult.getPayloadSize());
+ statement.setLong(columnIndex++, participantResult.getPriority());
+ statement.setLong(columnIndex++, participantResult.getTimeToLive());
+ statement.setLong(columnIndex++, participantResult.getAcknowledgeMode());
+ statement.setLong(columnIndex++, participantResult.getDeliveryMode());
+ statement.setLong(columnIndex++, participantResult.getBatchSize());
+ statement.setLong(columnIndex++, participantResult.getMaximumDuration());
+ statement.setLong(columnIndex++, 0 /* TODO PRODUCER_START_DELAY*/);
+ statement.setLong(columnIndex++, 0 /* TODO PRODUCER_INTERVAL*/);
+ statement.setLong(columnIndex++, 0 /* TODO IS_TOPIC*/);
+ statement.setLong(columnIndex++, 0 /* TODO IS_DURABLE_SUBSCRIPTION*/);
+ statement.setLong(columnIndex++, 0 /* TODO IS_BROWSING_SUBSCRIPTION*/);
+ statement.setLong(columnIndex++, 0 /* TODO IS_SELECTOR*/);
+ statement.setLong(columnIndex++, 0 /* TODO IS_NO_LOCAL*/);
+ statement.setLong(columnIndex++, 0 /* TODO IS_SYNCHRONOUS_CONSUMER*/);
+ statement.setLong(columnIndex++, participantResult.getTotalNumberOfConsumers());
+ statement.setLong(columnIndex++, participantResult.getTotalNumberOfProducers());
+ statement.setLong(columnIndex++, participantResult.getTotalPayloadProcessed());
+ statement.setLong(columnIndex++, participantResult.getTimeTaken());
+ statement.setString(columnIndex++, participantResult.getErrorMessage());
+ statement.setLong(columnIndex++, participantResult.getMinLatency());
+ statement.setLong(columnIndex++, participantResult.getMaxLatency());
+ statement.setDouble(columnIndex++, participantResult.getLatencyStandardDeviation());
+
+ statement.setString(columnIndex++, _runId);
+ statement.setTimestamp(columnIndex++, new Timestamp(_clock.currentTimeMillis()));
+
+ statement.execute();
+ connection.commit();
+ }
+ catch(SQLException e)
+ {
+ _logger.error("Couldn't write " + participantResult, e);
+ }
+ finally
+ {
+ if (statement != null)
+ {
+ statement.close();
+ }
+ }
+ }
+
+ public static class Clock
+ {
+ public long currentTimeMillis()
+ {
+ return System.currentTimeMillis();
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
+ .append("runId", _runId)
+ .append("url", _url)
+ .toString();
+ }
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java
index 3f8afc9a9a..f242111dc5 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java
@@ -218,7 +218,15 @@ public class ClientJmsDelegate
synchronized(session)
{
- final Destination destination = session.createQueue(command.getDestinationName());
+ final Destination destination;
+ if(command.isTopic())
+ {
+ destination = session.createTopic(command.getDestinationName());
+ }
+ else
+ {
+ destination = session.createQueue(command.getDestinationName());
+ }
final MessageProducer jmsProducer = session.createProducer(destination);
@@ -373,30 +381,6 @@ public class ClientJmsDelegate
}
}
- public void commitOrAcknowledgeMessage(final Message message, final String sessionName)
- {
- try
- {
- final Session session = _testSessions.get(sessionName);
- if (session.getTransacted())
- {
- synchronized(session)
- {
- session.commit();
- }
- }
- else if (message != null && session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
- {
- message.acknowledge();
- }
- }
- catch (final JMSException jmse)
- {
- throw new DistributedTestException("Unable to commit or acknowledge message on session: " +
- sessionName, jmse);
- }
- }
-
public int getAcknowledgeMode(final String sessionName)
{
try
@@ -493,31 +477,36 @@ public class ClientJmsDelegate
}
}
- public void rollbackOrRecover(String sessionName)
+ public void commitOrAcknowledgeMessageIfNecessary(final String sessionName, final Message message)
{
try
{
final Session session = _testSessions.get(sessionName);
- synchronized(session)
+ if (session.getTransacted())
{
- if (session.getTransacted())
- {
- session.rollback();
- }
- else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+ synchronized(session)
{
- session.recover();
+ session.commit();
}
}
+ else if (message != null && session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+ {
+ message.acknowledge();
+ }
}
catch (final JMSException jmse)
{
- throw new DistributedTestException("Unable to rollback or recover on session: " +
+ throw new DistributedTestException("Unable to commit or acknowledge message on session: " +
sessionName, jmse);
}
}
- public void releaseMessage(String sessionName)
+ public void commitIfNecessary(final String sessionName)
+ {
+ commitOrAcknowledgeMessageIfNecessary(sessionName, null);
+ }
+
+ public void rollbackOrRecoverIfNecessary(String sessionName)
{
try
{
@@ -528,7 +517,7 @@ public class ClientJmsDelegate
{
session.rollback();
}
- else
+ else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
{
session.recover();
}
@@ -536,7 +525,8 @@ public class ClientJmsDelegate
}
catch (final JMSException jmse)
{
- LOGGER.warn("Unable to rollback or recover on session: " + sessionName, jmse);
+ throw new DistributedTestException("Unable to rollback or recover on session: " +
+ sessionName, jmse);
}
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java
index c80e641e5c..782f7ae2fd 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java
@@ -224,12 +224,12 @@ public class ControllerJmsDelegate
public void createQueues(List<QueueConfig> queues)
{
- _queueCreator.createQueues(_session, queues);
+ _queueCreator.createQueues(_connection, _session, queues);
}
public void deleteQueues(List<QueueConfig> queues)
{
- _queueCreator.deleteQueues(_session, queues);
+ _queueCreator.deleteQueues(_connection, _session, queues);
}
public void addCommandListener(CommandListener commandListener)
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/NoOpQueueCreator.java b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/NoOpQueueCreator.java
index 4d4850eccf..d7e0007b28 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/NoOpQueueCreator.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/NoOpQueueCreator.java
@@ -20,18 +20,19 @@ package org.apache.qpid.disttest.jms;
import java.util.List;
+import javax.jms.Connection;
import javax.jms.Session;
import org.apache.qpid.disttest.controller.config.QueueConfig;
public class NoOpQueueCreator implements QueueCreator
{
@Override
- public void createQueues(Session session, List<QueueConfig> configs)
+ public void createQueues(Connection connection, Session session, List<QueueConfig> configs)
{
}
@Override
- public void deleteQueues(Session session, List<QueueConfig> configs)
+ public void deleteQueues(Connection connection, Session session, List<QueueConfig> configs)
{
}
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
index 6874abe7d4..ef2cfb6cd4 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
@@ -20,21 +20,29 @@ package org.apache.qpid.disttest.jms;
import java.util.List;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
import javax.jms.Session;
+
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.disttest.DistributedTestException;
import org.apache.qpid.disttest.controller.config.QueueConfig;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
public class QpidQueueCreator implements QueueCreator
{
private static final Logger LOGGER = LoggerFactory.getLogger(QpidQueueCreator.class);
private static final FieldTable EMPTY_QUEUE_BIND_ARGUMENTS = new FieldTable();
+ private static final String QUEUE_CREATOR_DRAIN_POLL_TIMEOUT = "qpid.disttest.queue.creator.drainPollTime";
+ private static int _drainPollTimeout = Integer.getInteger(QUEUE_CREATOR_DRAIN_POLL_TIMEOUT, 500);
@Override
- public void createQueues(Session session, List<QueueConfig> configs)
+ public void createQueues(Connection connection, Session session, List<QueueConfig> configs)
{
AMQSession<?, ?> amqSession = (AMQSession<?, ?>)session;
for (QueueConfig queueConfig : configs)
@@ -44,12 +52,88 @@ public class QpidQueueCreator implements QueueCreator
}
@Override
- public void deleteQueues(Session session, List<QueueConfig> configs)
+ public void deleteQueues(Connection connection, Session session, List<QueueConfig> configs)
{
AMQSession<?, ?> amqSession = (AMQSession<?, ?>)session;
for (QueueConfig queueConfig : configs)
{
- deleteQueue(amqSession, queueConfig);
+ AMQDestination destination = createAMQDestination(amqSession, queueConfig);
+
+ // drainQueue method is added because deletion of queue with a lot
+ // of messages takes time and might cause the timeout exception
+ drainQueue(connection, destination);
+
+ deleteQueue(amqSession, destination.getAMQQueueName());
+ }
+ }
+
+ private AMQDestination createAMQDestination(AMQSession<?, ?> amqSession, QueueConfig queueConfig)
+ {
+ try
+ {
+ return (AMQDestination) amqSession.createQueue(queueConfig.getName());
+ }
+ catch (Exception e)
+ {
+ throw new DistributedTestException("Failed to create amq destionation object:" + queueConfig, e);
+ }
+ }
+
+ private long getQueueDepth(AMQSession<?, ?> amqSession, AMQDestination destination)
+ {
+ try
+ {
+ long queueDepth = amqSession.getQueueDepth(destination);
+ return queueDepth;
+ }
+ catch (Exception e)
+ {
+ throw new DistributedTestException("Failed to query queue depth:" + destination, e);
+ }
+ }
+
+ private void drainQueue(Connection connection, AMQDestination destination)
+ {
+ Session noAckSession = null;
+ try
+ {
+ LOGGER.debug("About to drain the queue {}", destination.getQueueName());
+ noAckSession = connection.createSession(false, org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
+ MessageConsumer messageConsumer = noAckSession.createConsumer(destination);
+
+ long currentQueueDepth = getQueueDepth((AMQSession<?,?>)noAckSession, destination);
+ int counter = 0;
+ while (currentQueueDepth > 0)
+ {
+ LOGGER.info("Queue {} has {} message(s)", destination.getQueueName(), currentQueueDepth);
+
+ while(messageConsumer.receive(_drainPollTimeout) != null)
+ {
+ counter++;
+ }
+
+ currentQueueDepth = getQueueDepth((AMQSession<?,?>)noAckSession, destination);
+ }
+ LOGGER.info("Drained {} message(s) from queue {} ", counter, destination.getQueueName());
+ messageConsumer.close();
+ }
+ catch (Exception e)
+ {
+ throw new DistributedTestException("Failed to drain queue:" + destination, e);
+ }
+ finally
+ {
+ if (noAckSession != null)
+ {
+ try
+ {
+ noAckSession.close();
+ }
+ catch (JMSException e)
+ {
+ throw new DistributedTestException("Failed to close n/a session:" + noAckSession, e);
+ }
+ }
}
}
@@ -66,7 +150,7 @@ public class QpidQueueCreator implements QueueCreator
EMPTY_QUEUE_BIND_ARGUMENTS, destination.getExchangeName(),
destination, autoDelete);
- LOGGER.debug("Created queue " + queueConfig);
+ LOGGER.debug("Created queue {}", queueConfig);
}
catch (Exception e)
{
@@ -74,20 +158,19 @@ public class QpidQueueCreator implements QueueCreator
}
}
- private void deleteQueue(AMQSession<?, ?> session, QueueConfig queueConfig)
+ private void deleteQueue(AMQSession<?, ?> session, AMQShortString queueName)
{
try
{
// The Qpid AMQSession API currently makes the #deleteQueue method protected and the
// raw protocol method public. This should be changed then we should switch the below to
// use #deleteQueue.
- AMQDestination destination = (AMQDestination) session.createQueue(queueConfig.getName());
- session.sendQueueDelete(destination.getAMQQueueName());
- LOGGER.debug("Deleted queue " + queueConfig.getName());
+ session.sendQueueDelete(queueName);
+ LOGGER.debug("Deleted queue {}", queueName);
}
catch (Exception e)
{
- throw new DistributedTestException("Failed to delete queue:" + queueConfig.getName(), e);
+ throw new DistributedTestException("Failed to delete queue:" + queueName, e);
}
}
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java
index 0947dd53cb..a37cd7888c 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java
@@ -20,12 +20,13 @@ package org.apache.qpid.disttest.jms;
import java.util.List;
+import javax.jms.Connection;
import javax.jms.Session;
import org.apache.qpid.disttest.controller.config.QueueConfig;
public interface QueueCreator
{
- public void createQueues(final Session session, final List<QueueConfig> configs);
- public void deleteQueues(final Session session, final List<QueueConfig> configs);
+ void createQueues(Connection connection, Session session, List<QueueConfig> configs);
+ void deleteQueues(Connection connection, Session session, List<QueueConfig> configs);
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/message/ConsumerParticipantResult.java b/java/perftests/src/main/java/org/apache/qpid/disttest/message/ConsumerParticipantResult.java
index ad9aa31472..e78f6965d2 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/message/ConsumerParticipantResult.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/message/ConsumerParticipantResult.java
@@ -134,6 +134,7 @@ public class ConsumerParticipantResult extends ParticipantResult
_messageLatencies = messageLatencies;
}
+ @Override
@OutputAttribute(attribute=ParticipantAttribute.MIN_LATENCY)
public long getMinLatency()
{
@@ -145,6 +146,7 @@ public class ConsumerParticipantResult extends ParticipantResult
_minLatency = minLatency;
}
+ @Override
@OutputAttribute(attribute=ParticipantAttribute.MAX_LATENCY)
public long getMaxLatency()
{
@@ -156,6 +158,7 @@ public class ConsumerParticipantResult extends ParticipantResult
_maxLatency = maxLatency;
}
+ @Override
@OutputAttribute(attribute=ParticipantAttribute.AVERAGE_LATENCY)
public double getAverageLatency()
{
@@ -167,6 +170,7 @@ public class ConsumerParticipantResult extends ParticipantResult
_averageLatency = averageLatency;
}
+ @Override
@OutputAttribute(attribute=ParticipantAttribute.LATENCY_STANDARD_DEVIATION)
public double getLatencyStandardDeviation()
{
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateConsumerCommand.java b/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateConsumerCommand.java
index 68c21fbf83..07a60504c8 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateConsumerCommand.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateConsumerCommand.java
@@ -21,7 +21,6 @@ package org.apache.qpid.disttest.message;
public class CreateConsumerCommand extends CreateParticpantCommand
{
- private boolean _isTopic;
private boolean _isDurableSubscription;
private boolean _isBrowsingSubscription;
private String _selector;
@@ -75,16 +74,6 @@ public class CreateConsumerCommand extends CreateParticpantCommand
this._noLocal = noLocal;
}
- public boolean isTopic()
- {
- return _isTopic;
- }
-
- public void setTopic(boolean isTopic)
- {
- this._isTopic = isTopic;
- }
-
public boolean isSynchronous()
{
return _synchronous;
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateParticpantCommand.java b/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateParticpantCommand.java
index b1caa6ef75..e7349bf795 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateParticpantCommand.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateParticpantCommand.java
@@ -23,6 +23,7 @@ import org.apache.commons.lang.builder.ToStringBuilder;
public abstract class CreateParticpantCommand extends Command
{
+ private boolean _isTopic;
private String _participantName;
private String _sessionName;
private String _destinationName;
@@ -65,6 +66,16 @@ public abstract class CreateParticpantCommand extends Command
_destinationName = destinationName;
}
+ public boolean isTopic()
+ {
+ return _isTopic;
+ }
+
+ public void setTopic(boolean isTopic)
+ {
+ _isTopic = isTopic;
+ }
+
public long getNumberOfMessages()
{
return _numberOfMessages;
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantAttribute.java b/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantAttribute.java
index 0418562a2d..1154ff306c 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantAttribute.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantAttribute.java
@@ -18,6 +18,8 @@
*/
package org.apache.qpid.disttest.message;
+import java.text.DecimalFormat;
+
import org.apache.qpid.disttest.client.Participant;
/**
@@ -31,6 +33,8 @@ public enum ParticipantAttribute
{
TEST_NAME("testName"),
ITERATION_NUMBER("iterationNumber"),
+ THROUGHPUT("throughputKbPerS", "#"),
+ AVERAGE_LATENCY("averageLatency", "#"),
CONFIGURED_CLIENT_NAME("clientName"),
PARTICIPANT_NAME("participantName"),
NUMBER_OF_MESSAGES_PROCESSED("numberOfMessages"),
@@ -52,24 +56,56 @@ public enum ParticipantAttribute
TOTAL_NUMBER_OF_CONSUMERS("totalNumberOfConsumers"),
TOTAL_NUMBER_OF_PRODUCERS("totalNumberOfProducers"),
TOTAL_PAYLOAD_PROCESSED("totalPayloadProcessedB"),
- THROUGHPUT("throughputKbPerS"),
TIME_TAKEN("timeTakenMs"),
ERROR_MESSAGE("errorMessage"),
MIN_LATENCY("minLatency"),
MAX_LATENCY("maxLatency"),
- AVERAGE_LATENCY("averageLatency"),
- LATENCY_STANDARD_DEVIATION("latencyStandardDeviation")
+ LATENCY_STANDARD_DEVIATION("latencyStandardDeviation"),
+ MESSAGE_THROUGHPUT("throughputMessagesPerS")
;
private String _displayName;
+ private String _decimalFormat;
ParticipantAttribute(String displayName)
{
_displayName = displayName;
}
+ ParticipantAttribute(String displayName, String decimalFormat)
+ {
+ _displayName = displayName;
+ _decimalFormat = decimalFormat;
+ }
+
+ public String getDecimalFormat()
+ {
+ return _decimalFormat;
+ }
+
public String getDisplayName()
{
return _displayName;
}
+
+ public String format(Object attributeValue)
+ {
+ if(attributeValue == null)
+ {
+ return null;
+ }
+
+ String attributeAsString = String.valueOf(attributeValue);
+
+ if(_decimalFormat != null)
+ {
+ DecimalFormat decimalFormat = new DecimalFormat(_decimalFormat);
+ double attributeAsDoule = Double.valueOf(attributeAsString);
+ return decimalFormat.format(attributeAsDoule);
+ }
+ else
+ {
+ return attributeAsString;
+ }
+ }
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantResult.java b/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantResult.java
index a6d3d91bae..0a824a316b 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantResult.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantResult.java
@@ -22,11 +22,12 @@ import static org.apache.qpid.disttest.message.ParticipantAttribute.BATCH_SIZE;
import static org.apache.qpid.disttest.message.ParticipantAttribute.CONFIGURED_CLIENT_NAME;
import static org.apache.qpid.disttest.message.ParticipantAttribute.ITERATION_NUMBER;
import static org.apache.qpid.disttest.message.ParticipantAttribute.MAXIMUM_DURATION;
-import static org.apache.qpid.disttest.message.ParticipantAttribute.PAYLOAD_SIZE;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.MESSAGE_THROUGHPUT;
import static org.apache.qpid.disttest.message.ParticipantAttribute.NUMBER_OF_MESSAGES_PROCESSED;
-import static org.apache.qpid.disttest.message.ParticipantAttribute.THROUGHPUT;
import static org.apache.qpid.disttest.message.ParticipantAttribute.PARTICIPANT_NAME;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.PAYLOAD_SIZE;
import static org.apache.qpid.disttest.message.ParticipantAttribute.TEST_NAME;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.THROUGHPUT;
import java.util.Comparator;
import java.util.Date;
@@ -49,6 +50,7 @@ public class ParticipantResult extends Response
private long _totalPayloadProcessed;
private int _payloadSize;
private double _throughput;
+ private int _messageThroughput;
private int _totalNumberOfConsumers;
private int _totalNumberOfProducers;
@@ -236,6 +238,17 @@ public class ParticipantResult extends Response
_throughput = throughput;
}
+ @OutputAttribute(attribute=MESSAGE_THROUGHPUT)
+ public int getMessageThroughput()
+ {
+ return _messageThroughput;
+ }
+
+ public void setMessageThroughput(int throughput)
+ {
+ _messageThroughput = throughput;
+ }
+
public void setTotalNumberOfConsumers(int totalNumberOfConsumers)
{
_totalNumberOfConsumers = totalNumberOfConsumers;
@@ -269,4 +282,41 @@ public class ParticipantResult extends Response
_acknowledgeMode = acknowledgeMode;
}
+ public double getLatencyStandardDeviation()
+ {
+ return 0.0;
+ }
+
+ @OutputAttribute(attribute = ParticipantAttribute.MIN_LATENCY)
+ public long getMinLatency()
+ {
+ return 0;
+ }
+
+ @OutputAttribute(attribute = ParticipantAttribute.MAX_LATENCY)
+ public long getMaxLatency()
+ {
+ return 0;
+ }
+
+ @OutputAttribute(attribute = ParticipantAttribute.AVERAGE_LATENCY)
+ public double getAverageLatency()
+ {
+ return 0;
+ }
+
+ public int getPriority()
+ {
+ return 0;
+ }
+
+ public long getTimeToLive()
+ {
+ return 0;
+ }
+
+ public int getDeliveryMode()
+ {
+ return 0;
+ }
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/message/ProducerParticipantResult.java b/java/perftests/src/main/java/org/apache/qpid/disttest/message/ProducerParticipantResult.java
index 766c90eec8..2d9399a3d3 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/message/ProducerParticipantResult.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/message/ProducerParticipantResult.java
@@ -42,6 +42,7 @@ public class ProducerParticipantResult extends ParticipantResult
setParticipantName(participantName);
}
+ @Override
@OutputAttribute(attribute=PRIORITY)
public int getPriority()
{
@@ -53,6 +54,7 @@ public class ProducerParticipantResult extends ParticipantResult
_priority = priority;
}
+ @Override
@OutputAttribute(attribute=TIME_TO_LIVE)
public long getTimeToLive()
{
@@ -86,6 +88,7 @@ public class ProducerParticipantResult extends ParticipantResult
_interval = producerInterval;
}
+ @Override
@OutputAttribute(attribute=DELIVERY_MODE)
public int getDeliveryMode()
{
@@ -96,5 +99,4 @@ public class ProducerParticipantResult extends ParticipantResult
{
this._deliveryMode = deliveryMode;
}
-
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ITestResult.java b/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ITestResult.java
index 3f9cdff69d..6230067486 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ITestResult.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ITestResult.java
@@ -22,10 +22,8 @@ import java.util.List;
import org.apache.qpid.disttest.message.ParticipantResult;
-// TODO rename me!!
public interface ITestResult
{
-
// TODO should weaken to Collection
List<ParticipantResult> getParticipantResults();
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ParticipantResultAggregator.java b/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ParticipantResultAggregator.java
index 4dcabe6c7b..c21a78d359 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ParticipantResultAggregator.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ParticipantResultAggregator.java
@@ -142,6 +142,7 @@ public class ParticipantResultAggregator
aggregatedResult.setStartDate(new Date(_minStartDate));
aggregatedResult.setEndDate(new Date(_maxEndDate));
aggregatedResult.setThroughput(calculateThroughputInKiloBytesPerSecond());
+ aggregatedResult.setMessageThroughput(calculateThroughputInMessagesPerSecond());
}
private void setRolledUpConstantAttributes(ParticipantResult aggregatedResult)
@@ -197,4 +198,14 @@ public class ParticipantResultAggregator
return totalPayloadProcessedInKiloBytes/durationInSeconds;
}
+ private int calculateThroughputInMessagesPerSecond()
+ {
+ double durationInMillis = _maxEndDate - _minStartDate;
+ if (durationInMillis == 0 )
+ {
+ return 0;
+ }
+
+ return (int)Math.round((_numberOfMessagesProcessed * 1000.0d)/durationInMillis);
+ }
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/TestResultAggregator.java b/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/TestResultAggregator.java
index 5934e0e997..954828b043 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/TestResultAggregator.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/TestResultAggregator.java
@@ -18,6 +18,9 @@
*/
package org.apache.qpid.disttest.results.aggregation;
+import java.util.List;
+
+import org.apache.qpid.disttest.controller.ResultsForAllTests;
import org.apache.qpid.disttest.message.ConsumerParticipantResult;
import org.apache.qpid.disttest.message.ParticipantResult;
import org.apache.qpid.disttest.message.ProducerParticipantResult;
@@ -102,5 +105,26 @@ public class TestResultAggregator
aggregatedAllResult.setNumberOfMessagesProcessed(aggregatedConsumerResult.getNumberOfMessagesProcessed());
aggregatedAllResult.setTotalPayloadProcessed(aggregatedConsumerResult.getTotalPayloadProcessed());
aggregatedAllResult.setThroughput(aggregatedConsumerResult.getThroughput());
+ aggregatedAllResult.setMessageThroughput(aggregatedConsumerResult.getMessageThroughput());
+ }
+
+ /**
+ * Produces a single {@link ResultsForAllTests} from the supplied list, only containing
+ * the "All participants" results.
+ */
+ public ResultsForAllTests aggregateTestResults(List<ResultsForAllTests> allResultsList)
+ {
+ ResultsForAllTests retVal = new ResultsForAllTests();
+
+ for (ResultsForAllTests resultsForAllTests : allResultsList)
+ {
+ ResultsForAllTests allParticipantsResult = resultsForAllTests.getAllParticipantsResult();
+ for (ITestResult testResult : allParticipantsResult.getTestResults())
+ {
+ retVal.add(testResult);
+ }
+ }
+
+ return retVal;
}
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/results/formatting/CSVFormater.java b/java/perftests/src/main/java/org/apache/qpid/disttest/results/formatting/CSVFormatter.java
index 52e53ca624..ea7a3f78c7 100644
--- a/java/perftests/src/main/java/org/apache/qpid/disttest/results/formatting/CSVFormater.java
+++ b/java/perftests/src/main/java/org/apache/qpid/disttest/results/formatting/CSVFormatter.java
@@ -32,7 +32,7 @@ import org.apache.qpid.disttest.results.aggregation.ITestResult;
/**
* produces CSV output using the ordered enums in {@link ParticipantAttribute}
*/
-public class CSVFormater
+public class CSVFormatter
{
public String format(ResultsForAllTests results)
{
@@ -66,7 +66,9 @@ public class CSVFormater
List<Object> attributeValues = new ArrayList<Object>();
for (ParticipantAttribute attribute : ParticipantAttribute.values())
{
- attributeValues.add(attributeValueMap.get(attribute));
+ Object attributeValue = attributeValueMap.get(attribute);
+ String attributeValueFormatted = attribute.format(attributeValue);
+ attributeValues.add(attributeValueFormatted);
}
String row = StringUtils.join(attributeValues.toArray(), ",");