summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Donald Kennedy <grkvlt@apache.org>2010-12-13 00:41:01 +0000
committerAndrew Donald Kennedy <grkvlt@apache.org>2010-12-13 00:41:01 +0000
commited5bad4e326394dfdc452f7df8c62d7beb8e21f1 (patch)
tree31b0bc4ff0c671dde8f13c8ce0a90bdba948872e
parentb06f4cd3bdce7aba635e04af0f21cd6ab9400df3 (diff)
downloadqpid-python-ed5bad4e326394dfdc452f7df8c62d7beb8e21f1.tar.gz
QPID-2970: Make performace tests multi-threaded
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5.x-dev@1044979 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xqpid/java/perftests/etc/dlq/Framework.sh10
-rwxr-xr-xqpid/java/perftests/etc/dlq/Statistics.sh34
-rwxr-xr-xqpid/java/perftests/etc/dlq/Test.sh34
-rw-r--r--qpid/java/perftests/etc/dlq/config.properties9
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Client.java13
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Receiver.java79
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceFramework.java26
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceStatistics.java58
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java75
9 files changed, 260 insertions, 78 deletions
diff --git a/qpid/java/perftests/etc/dlq/Framework.sh b/qpid/java/perftests/etc/dlq/Framework.sh
index f6db87e557..3704e1a593 100755
--- a/qpid/java/perftests/etc/dlq/Framework.sh
+++ b/qpid/java/perftests/etc/dlq/Framework.sh
@@ -18,8 +18,16 @@
# under the License.
#
+# Set Qpid Version
+VERSION=0.5
+
# Setup Java CLASSPATH
-CLASSPATH=${QPID_HOME}/lib/qpid-all-${VERSION}.jar:${QPID_HOME}/lib/qpid-perftests-${VERSION}.jar
+CLASSPATH=${QPID_HOME}/lib/qpid-all.jar
+CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/qpid-perftests-${VERSION}.jar
+CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/slf4j-api-1.4.0.jar
+CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/slf4j-log4j12-1.4.0.jar
+CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/log4j-1.2.12.jar
+CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/geronimo-jms_1.1_spec-1.0.jar
# Run Performance Test Framework
echo "Running DLQ Performance Tests"
diff --git a/qpid/java/perftests/etc/dlq/Statistics.sh b/qpid/java/perftests/etc/dlq/Statistics.sh
new file mode 100755
index 0000000000..1e615407de
--- /dev/null
+++ b/qpid/java/perftests/etc/dlq/Statistics.sh
@@ -0,0 +1,34 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Set Qpid Version
+VERSION=0.5
+
+# Setup Java CLASSPATH
+CLASSPATH=${QPID_HOME}/lib/qpid-all.jar
+CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/qpid-perftests-${VERSION}.jar
+CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/slf4j-api-1.4.0.jar
+CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/slf4j-log4j12-1.4.0.jar
+CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/log4j-1.2.12.jar
+CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/geronimo-jms_1.1_spec-1.0.jar
+
+# Run Performance Test Framework
+echo "Running DLQ Performance Tests"
+java -cp ${CLASSPATH} org.apache.qpid.perftests.dlq.test.PerformanceStatistics $*
diff --git a/qpid/java/perftests/etc/dlq/Test.sh b/qpid/java/perftests/etc/dlq/Test.sh
new file mode 100755
index 0000000000..713d729bda
--- /dev/null
+++ b/qpid/java/perftests/etc/dlq/Test.sh
@@ -0,0 +1,34 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Set Qpid Version
+VERSION=0.5
+
+# Setup Java CLASSPATH
+CLASSPATH=${QPID_HOME}/lib/qpid-all.jar
+CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/qpid-perftests-${VERSION}.jar
+CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/slf4j-api-1.4.0.jar
+CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/slf4j-log4j12-1.4.0.jar
+CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/log4j-1.2.12.jar
+CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/geronimo-jms_1.1_spec-1.0.jar
+
+# Run Performance Test Framework
+echo "Running DLQ Performance Tests"
+java -cp ${CLASSPATH} org.apache.qpid.perftests.dlq.test.PerformanceTest $*
diff --git a/qpid/java/perftests/etc/dlq/config.properties b/qpid/java/perftests/etc/dlq/config.properties
index 1d06339905..b1342158f8 100644
--- a/qpid/java/perftests/etc/dlq/config.properties
+++ b/qpid/java/perftests/etc/dlq/config.properties
@@ -4,22 +4,21 @@
repeat = 10
# shared properties
-broker = tcp://magenta:5672
-#broker = tcp://localhost:5672
+broker = tcp://localhost:5672
maxRedelivery = 3
maxPrefetch = 1
session = SESSION_TRANSACTED
queue = test
count = 1000
persistent = true
-maxRecords = 2000
+maxRecords = 10000
# producer properties
size = 4096
messageIds = true
## consumer properties
-threads = 1
+threads = 5
listener = false
reject = 2
-rejectCount = 3 \ No newline at end of file
+rejectCount = 3
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Client.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Client.java
index c87ceae942..81517b6fbc 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Client.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Client.java
@@ -16,6 +16,14 @@ import org.apache.qpid.client.configuration.ClientProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Parent abstract class for all performance test clients that connect to a
+ * broker and perform test operations. All clients are {@link Callable}
+ * objects that return an integer value, or throw an exception. The
+ * {@link #connect()} method returns a boolean to indicate whether the
+ * broker connection succeeded, and can be used to abort tests if there is
+ * no available broker.
+ */
public abstract class Client implements Callable<Integer>
{
protected static final Logger _log = LoggerFactory.getLogger(Client.class);
@@ -102,7 +110,7 @@ public abstract class Client implements Callable<Integer>
throw new RuntimeException("session property not recognised: " + sessionType);
}
- public void connect()
+ public boolean connect()
{
String url = "amqp://guest:guest@" + _client + "/test?brokerlist='" + _broker + "'&maxprefetch='" + _maxPrefetch + "'&maxdeliverycount='" + _maxRedelivery + "'";
System.setProperty(ClientProperties.MAX_DELIVERY_RECORDS_PROP_NAME, Integer.toString(_maxRecords));
@@ -120,11 +128,12 @@ public abstract class Client implements Callable<Integer>
System.exit(0);
}
});
+ return true;
}
catch (Exception e)
{
_log.error("Unable to setup connection, client and producer on broker", e);
- throw new RuntimeException(e);
+ return false;
}
}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Receiver.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Receiver.java
index 1eb3c47983..f38e2c0888 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Receiver.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Receiver.java
@@ -11,11 +11,10 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
+import javax.jms.Session;
import org.apache.qpid.perftests.dlq.test.PerformanceTest;
-
-
public class Receiver extends Client
{
private MessageConsumer _consumer;
@@ -23,11 +22,16 @@ public class Receiver extends Client
private int _reject;
private int _rejectCount;
private Map<Integer, Integer> _rejected = new HashMap<Integer, Integer>();
+ private int _receivedCount = 0;
private static volatile boolean _stopped;
private static CountDownLatch _finished;
private static AtomicInteger _id;
- private static AtomicInteger _received;
+ private static AtomicInteger _totalReceivedCount;
+ private static AtomicInteger _totalConsumedCount;
+ private static AtomicInteger _rejectedCount;
+ private static int _consumedCheck;
+ private static int _rejectedCheck;
public Receiver(Properties props)
{
@@ -39,17 +43,25 @@ public class Receiver extends Client
public static void reset()
{
_id = new AtomicInteger(0);
- _received = new AtomicInteger(0);
+ _totalReceivedCount = new AtomicInteger(0);
+ _totalConsumedCount = new AtomicInteger(0);
+ _rejectedCount = new AtomicInteger(0);
_finished = new CountDownLatch(1);
_stopped = false;
}
+
- public void start() throws Exception
+ public synchronized void start() throws Exception
{
_listener = Boolean.parseBoolean(_props.getProperty(LISTENER));
_reject = Integer.parseInt(_props.getProperty(REJECT));
_rejectCount = Integer.parseInt(_props.getProperty(REJECT_COUNT));
-
+
+ boolean sessionOk = (_transacted || _clientAck) ||
+ ((_sessionType == Session.AUTO_ACKNOWLEDGE || _sessionType == Session.DUPS_OK_ACKNOWLEDGE) && _listener);
+ _rejectedCheck = (!sessionOk || _messageIds || _maxRedelivery == 0 || _rejectCount < _maxRedelivery) ? 0 : _count / _reject;
+ _consumedCheck = (_count - _rejectedCheck); // + (sessionOk ? ((_count / _reject) * _rejectCount) : 0);
+
_consumer = _session.createConsumer(_queue);
_connection.start();
@@ -71,7 +83,10 @@ public class Receiver extends Client
while (!_stopped)
{
Message msg = _consumer.receive(1000);
- processMessage(msg);
+ if (msg != null)
+ {
+ processMessage(msg);
+ }
}
}
@@ -79,7 +94,7 @@ public class Receiver extends Client
{
try
{
- _received.incrementAndGet();
+ _totalReceivedCount.incrementAndGet();
int number = msg.getIntProperty("number");
if (number % 100 == 0)
{
@@ -90,17 +105,24 @@ public class Receiver extends Client
if (rejectMessage)
{
int rejectCount = 0;
- if (_rejected.containsKey(number))
+ if (!_rejected.containsKey(number))
{
- rejectCount = _rejected.get(number);
+ _rejected.put(number, 0);
}
- _rejected.put(number, ++rejectCount);
+ rejectCount = _rejected.get(number) + 1;
+ _rejected.put(number, rejectCount);
if (rejectCount <= _rejectCount)
{
- if (rejectCount >= _maxRedelivery)
+ if (rejectCount == _maxRedelivery)
{
- _log.info("rejecting message (" + rejectCount + ") " + msg.getJMSMessageID());
+ _rejectedCount.incrementAndGet();
+ _log.info("client " + _client + " rejecting message (" + rejectCount + ") " + msg.getJMSMessageID());
}
+ if (rejectCount > _maxRedelivery)
+ {
+ throw new RuntimeException("client " + _client + " received message " + msg.getJMSMessageID() +
+ " " + rejectCount + " times");
+ }
if (_transacted)
{
_session.rollback();
@@ -118,6 +140,8 @@ public class Receiver extends Client
if (!rejectMessage)
{
+ _receivedCount++;
+ _totalConsumedCount.incrementAndGet();
if (_transacted)
{
_session.commit();
@@ -126,12 +150,14 @@ public class Receiver extends Client
{
msg.acknowledge();
}
- if (number == (_count - 1))
- {
- _stopped = true;
- _finished.countDown();
- }
}
+
+ if (_totalConsumedCount.get() >= _consumedCheck && _rejectedCount.get() >= _rejectedCheck)
+ {
+ _log.info("stopping receivers after " + _totalConsumedCount.get() + " received and " + _rejectedCount.get() + " rejected");
+ _stopped = true;
+ _finished.countDown();
+ }
}
catch (Exception e)
{
@@ -156,6 +182,21 @@ public class Receiver extends Client
_finished.await();
PerformanceTest.countDown();
- return _received.get();
+ return _receivedCount;
+ }
+
+ public static int getTotalReceivedCount()
+ {
+ return _totalReceivedCount.get();
+ }
+
+ public static int getConsumedCheck()
+ {
+ return _consumedCheck;
+ }
+
+ public static int getRejectedCheck()
+ {
+ return _rejectedCheck;
}
} \ No newline at end of file
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceFramework.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceFramework.java
index 3db257362c..0f29c1f5bf 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceFramework.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceFramework.java
@@ -8,7 +8,6 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
-import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
@@ -16,6 +15,10 @@ import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Run a series of different performance tests, based on a set of variations of
+ * configuration properties, and collect the results and generated statistics.
+ */
public class PerformanceFramework
{
private static final Logger _log = LoggerFactory.getLogger(PerformanceFramework.class);
@@ -69,7 +72,10 @@ public class PerformanceFramework
out.println(_id + "," + maxRedelivery + "," + rejectCount + "," +
Boolean.toString(messageIds == 0) + "," + Boolean.toString(listener == 0) +
"," + SESSION_VALUES[session]);
- runOnce(_id);
+ if (!runOnce(_id))
+ {
+ return;
+ }
}
}
}
@@ -77,21 +83,29 @@ public class PerformanceFramework
}
}
- public void runOnce(int id)
+ public boolean runOnce(int id)
{
PerformanceStatistics stats = new PerformanceStatistics(_props);
try
{
_log.info("starting test id " + id);
String fileId = String.format("%04d", id);
- stats.series(new File(_dir, fileId + "-series.csv"));
- _log.info("test id " + id + " completed ok");
- stats.statistics(new File(_dir, fileId + "-statistics.csv"));
+ if (stats.series(new File(_dir, fileId + "-series.csv")))
+ {
+ _log.info("test id " + id + " completed ok");
+ stats.statistics(new File(_dir, fileId + "-statistics.csv"));
+ }
+ else
+ {
+ _log.error("connection failure, test series aborted");
+ return false;
+ }
}
catch (Exception e)
{
_log.error("failed test id " + id + " with error", e);
}
+ return true;
}
public static void main(String[] argv) throws Exception
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceStatistics.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceStatistics.java
index a3f6a3fcba..7f009dbc08 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceStatistics.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceStatistics.java
@@ -15,6 +15,10 @@ import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Run a series of performance tests, based on specific configuration properties, and
+ * collect the results to generate statistics.
+ */
public class PerformanceStatistics
{
private static final Logger _log = LoggerFactory.getLogger(PerformanceStatistics.class);
@@ -22,6 +26,7 @@ public class PerformanceStatistics
private Properties _props;
private List<Double> _sent = new ArrayList<Double>();
private List<Double> _received = new ArrayList<Double>();
+ private List<Double> _consumed = new ArrayList<Double>();
private List<Double> _rejected = new ArrayList<Double>();
private List<Double> _duration = new ArrayList<Double>();
private List<Double> _throughputIn = new ArrayList<Double>();
@@ -50,23 +55,31 @@ public class PerformanceStatistics
_props = props;
}
- public void single(PrintStream out) throws Exception
+ public boolean single(PrintStream out) throws Exception
{
- PerformanceTest client = new PerformanceTest(_props);
- client.test();
- client.check(out);
- _sent.add(client.getSent());
- _received.add(client.getReceived());
- _rejected.add(client.getRejected());
- _duration.add(client.getDuration());
- _throughputIn.add(client.getThroughputIn());
- _throughputOut.add(client.getThroughputOut());
- _bandwidthIn.add(client.getBandwidthIn());
- _bandwidthOut.add(client.getBandwidthOut());
- _latency.add(client.getLatency());
+ PerformanceTest test = new PerformanceTest(_props);
+ if (test.test())
+ {
+ test.check(out);
+ _sent.add(test.getSent());
+ _received.add(test.getTotalReceived());
+ _consumed.add(test.getConsumed());
+ _rejected.add(test.getRejected());
+ _duration.add(test.getDuration());
+ _throughputIn.add(test.getThroughputIn());
+ _throughputOut.add(test.getThroughputOut());
+ _bandwidthIn.add(test.getBandwidthIn());
+ _bandwidthOut.add(test.getBandwidthOut());
+ _latency.add(test.getLatency());
+ return true;
+ }
+ else
+ {
+ return false;
+ }
}
- public void series(File file) throws Exception
+ public boolean series(File file) throws Exception
{
try
{
@@ -76,7 +89,10 @@ public class PerformanceStatistics
for (int i = 0; i < repeat; i++)
{
_log.info("starting individual test run " + i);
- single(out);
+ if (!single(out))
+ {
+ return false;
+ }
}
}
catch (Exception e)
@@ -86,6 +102,7 @@ public class PerformanceStatistics
_statistics.add(new Statistics(_sent, "sent"));
_statistics.add(new Statistics(_received, "received"));
+ _statistics.add(new Statistics(_consumed, "consumed"));
_statistics.add(new Statistics(_rejected, "rejected"));
_statistics.add(new Statistics(_duration, "duration"));
_statistics.add(new Statistics(_throughputIn, "throughputIn"));
@@ -93,6 +110,7 @@ public class PerformanceStatistics
_statistics.add(new Statistics(_bandwidthIn, "bandwidthIn"));
_statistics.add(new Statistics(_bandwidthOut, "bandwidthOut"));
_statistics.add(new Statistics(_latency, "latency"));
+ return true;
}
public void statistics(File file)
@@ -126,7 +144,13 @@ public class PerformanceStatistics
}
PerformanceStatistics stats = new PerformanceStatistics(propertyFile);
- stats.series(new File("series.csv"));
- stats.statistics(new File("statistics.csv"));
+ if (stats.series(new File("series.csv")))
+ {
+ stats.statistics(new File("statistics.csv"));
+ }
+ else
+ {
+ System.err.println("connection faulre, test series aborted");
+ }
}
}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java
index a8c7e7a905..a3be10a697 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java
@@ -16,13 +16,20 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.qpid.perftests.dlq.client.Check;
+import org.apache.qpid.perftests.dlq.client.Client;
import org.apache.qpid.perftests.dlq.client.Create;
import org.apache.qpid.perftests.dlq.client.Receiver;
import org.apache.qpid.perftests.dlq.client.Sender;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-
+/**
+ * Run a single performance test, based on specific configuration properties.
+ */
public class PerformanceTest
{
+ private static final Logger _log = LoggerFactory.getLogger(PerformanceFramework.class);
+
private static CountDownLatch _latch;
private ExecutorService _executor;
@@ -30,7 +37,7 @@ public class PerformanceTest
private int _size = 0;
private int _threads = 0;
private int _sent = 0;
- private int _received = 0;
+ private int _consumed = 0;
private int _rejected = 0;
private long _started = 0;
private long _finished = 0;
@@ -62,10 +69,14 @@ public class PerformanceTest
_props = props;
}
- public void test() throws Exception
+ public boolean test() throws Exception
{
- Create create = new Create(_props);
- create.connect();
+ Client create = new Create(_props);
+ if (!create.connect())
+ {
+ _log.error("initial connection failed");
+ return false;
+ }
create.call();
create.shutdown();
@@ -82,16 +93,16 @@ public class PerformanceTest
_latch = new CountDownLatch(1);
_started = System.nanoTime();
- Sender sender = new Sender(_props);
+ Client sender = new Sender(_props);
sender.connect();
Future<Integer> send = _executor.submit(sender);
Receiver.reset();
List<Future<Integer>> receives = new ArrayList<Future<Integer>>();
- List<Receiver> receivers = new ArrayList<Receiver>();
+ List<Client> receivers = new ArrayList<Client>();
for (int i = 0; i < _threads; i++)
{
- Receiver receiver = new Receiver(_props);
+ Client receiver = new Receiver(_props);
receiver.connect();
receivers.add(receiver);
receives.add(_executor.submit(receiver));
@@ -104,10 +115,10 @@ public class PerformanceTest
_sent = send.get();
for (Future<Integer> receive : receives)
{
- _received += receive.get();
+ _consumed += receive.get();
}
- Check check = new Check(_props);
+ Client check = new Check(_props);
check.connect();
_rejected = check.call();
check.shutdown();
@@ -119,12 +130,13 @@ public class PerformanceTest
finally
{
sender.shutdown();
- for (Receiver receiver : receivers)
+ for (Client receiver : receivers)
{
receiver.shutdown();
}
_executor.shutdownNow();
}
+ return true;
}
public void check(PrintStream out)
@@ -134,17 +146,13 @@ public class PerformanceTest
{
error.append("sent ").append(_sent).append(" not ").append(_count).append('\n');
}
- boolean sessionOk = ((_session.equalsIgnoreCase(CLIENT_ACKNOWLEDGE)) || (_session.equalsIgnoreCase(SESSION_TRANSACTED)) ||
- ((_session.equalsIgnoreCase(AUTO_ACKNOWLEDGE) || _session.equalsIgnoreCase(DUPS_OK_ACKNOWLEDGE)) && _listener));
- int rejected = (!sessionOk || !_messageIds || _maxRedelivery == 0 || _rejectCount < _maxRedelivery) ? 0 : _count / _reject;
- if (_rejected != rejected)
+ if (_rejected != Receiver.getRejectedCheck())
{
- error.append("rejected ").append(_rejected).append(" not ").append(rejected).append('\n');
+ error.append("rejected ").append(_rejected).append(" not ").append(Receiver.getRejectedCheck()).append('\n');
}
- int received = (_count - rejected) + (sessionOk ? ((_count / _reject) * _rejectCount) : 0);
- if (_received != received)
+ if (_consumed != Receiver.getConsumedCheck())
{
- error.append("received ").append(_received).append(" not ").append(received).append('\n');
+ error.append("consumed ").append(_consumed).append(" not ").append(Receiver.getConsumedCheck()).append('\n');
}
if (error.length() > 0)
{
@@ -158,12 +166,12 @@ public class PerformanceTest
public static String getHeader()
{
- return "sent,received,rejected,duration";
+ return "sent,received,consumed,rejected,duration";
}
public String toString()
{
- String results = String.format("%d,%d,%d,%f", _sent, _received, _rejected, getDuration());
+ String results = String.format("%d,%d,%d,%d,%f", _sent, Receiver.getTotalReceivedCount(), _consumed, _rejected, getDuration());
return results;
}
@@ -172,9 +180,14 @@ public class PerformanceTest
return (double) _sent;
}
- public double getReceived()
+ public double getConsumed()
+ {
+ return (double) _consumed;
+ }
+
+ public double getTotalReceived()
{
- return (double) _received;
+ return (double) Receiver.getTotalReceivedCount();
}
public double getDuration()
@@ -194,7 +207,7 @@ public class PerformanceTest
public double getThroughputOut()
{
- return getReceived() / getDuration();
+ return getTotalReceived() / getDuration();
}
public double getBandwidthIn()
@@ -204,12 +217,12 @@ public class PerformanceTest
public double getBandwidthOut()
{
- return (getReceived() * (double) _size) / getDuration();
+ return (getTotalReceived() * (double) _size) / getDuration();
}
public double getLatency()
{
- return getDuration() / getReceived();
+ return getDuration() / getTotalReceived();
}
public static void countDown()
@@ -230,8 +243,14 @@ public class PerformanceTest
throw new RuntimeException("property file '" + propertyFile.getName() + "' must exist and be readable");
}
PerformanceTest client = new PerformanceTest(propertyFile);
- client.test();
- client.check(System.out);
+ if (client.test())
+ {
+ client.check(System.out);
+ }
+ else
+ {
+ System.err.println("test connection failure");
+ }
}
}