diff options
author | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-12-13 00:41:01 +0000 |
---|---|---|
committer | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-12-13 00:41:01 +0000 |
commit | ed5bad4e326394dfdc452f7df8c62d7beb8e21f1 (patch) | |
tree | 31b0bc4ff0c671dde8f13c8ce0a90bdba948872e | |
parent | b06f4cd3bdce7aba635e04af0f21cd6ab9400df3 (diff) | |
download | qpid-python-ed5bad4e326394dfdc452f7df8c62d7beb8e21f1.tar.gz |
QPID-2970: Make performace tests multi-threaded
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5.x-dev@1044979 13f79535-47bb-0310-9956-ffa450edef68
9 files changed, 260 insertions, 78 deletions
diff --git a/qpid/java/perftests/etc/dlq/Framework.sh b/qpid/java/perftests/etc/dlq/Framework.sh index f6db87e557..3704e1a593 100755 --- a/qpid/java/perftests/etc/dlq/Framework.sh +++ b/qpid/java/perftests/etc/dlq/Framework.sh @@ -18,8 +18,16 @@ # under the License. # +# Set Qpid Version +VERSION=0.5 + # Setup Java CLASSPATH -CLASSPATH=${QPID_HOME}/lib/qpid-all-${VERSION}.jar:${QPID_HOME}/lib/qpid-perftests-${VERSION}.jar +CLASSPATH=${QPID_HOME}/lib/qpid-all.jar +CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/qpid-perftests-${VERSION}.jar +CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/slf4j-api-1.4.0.jar +CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/slf4j-log4j12-1.4.0.jar +CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/log4j-1.2.12.jar +CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/geronimo-jms_1.1_spec-1.0.jar # Run Performance Test Framework echo "Running DLQ Performance Tests" diff --git a/qpid/java/perftests/etc/dlq/Statistics.sh b/qpid/java/perftests/etc/dlq/Statistics.sh new file mode 100755 index 0000000000..1e615407de --- /dev/null +++ b/qpid/java/perftests/etc/dlq/Statistics.sh @@ -0,0 +1,34 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Set Qpid Version +VERSION=0.5 + +# Setup Java CLASSPATH +CLASSPATH=${QPID_HOME}/lib/qpid-all.jar +CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/qpid-perftests-${VERSION}.jar +CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/slf4j-api-1.4.0.jar +CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/slf4j-log4j12-1.4.0.jar +CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/log4j-1.2.12.jar +CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/geronimo-jms_1.1_spec-1.0.jar + +# Run Performance Test Framework +echo "Running DLQ Performance Tests" +java -cp ${CLASSPATH} org.apache.qpid.perftests.dlq.test.PerformanceStatistics $* diff --git a/qpid/java/perftests/etc/dlq/Test.sh b/qpid/java/perftests/etc/dlq/Test.sh new file mode 100755 index 0000000000..713d729bda --- /dev/null +++ b/qpid/java/perftests/etc/dlq/Test.sh @@ -0,0 +1,34 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Set Qpid Version +VERSION=0.5 + +# Setup Java CLASSPATH +CLASSPATH=${QPID_HOME}/lib/qpid-all.jar +CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/qpid-perftests-${VERSION}.jar +CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/slf4j-api-1.4.0.jar +CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/slf4j-log4j12-1.4.0.jar +CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/log4j-1.2.12.jar +CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/geronimo-jms_1.1_spec-1.0.jar + +# Run Performance Test Framework +echo "Running DLQ Performance Tests" +java -cp ${CLASSPATH} org.apache.qpid.perftests.dlq.test.PerformanceTest $* diff --git a/qpid/java/perftests/etc/dlq/config.properties b/qpid/java/perftests/etc/dlq/config.properties index 1d06339905..b1342158f8 100644 --- a/qpid/java/perftests/etc/dlq/config.properties +++ b/qpid/java/perftests/etc/dlq/config.properties @@ -4,22 +4,21 @@ repeat = 10 # shared properties -broker = tcp://magenta:5672 -#broker = tcp://localhost:5672 +broker = tcp://localhost:5672 maxRedelivery = 3 maxPrefetch = 1 session = SESSION_TRANSACTED queue = test count = 1000 persistent = true -maxRecords = 2000 +maxRecords = 10000 # producer properties size = 4096 messageIds = true ## consumer properties -threads = 1 +threads = 5 listener = false reject = 2 -rejectCount = 3
\ No newline at end of file +rejectCount = 3 diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Client.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Client.java index c87ceae942..81517b6fbc 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Client.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Client.java @@ -16,6 +16,14 @@ import org.apache.qpid.client.configuration.ClientProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Parent abstract class for all performance test clients that connect to a + * broker and perform test operations. All clients are {@link Callable} + * objects that return an integer value, or throw an exception. The + * {@link #connect()} method returns a boolean to indicate whether the + * broker connection succeeded, and can be used to abort tests if there is + * no available broker. + */ public abstract class Client implements Callable<Integer> { protected static final Logger _log = LoggerFactory.getLogger(Client.class); @@ -102,7 +110,7 @@ public abstract class Client implements Callable<Integer> throw new RuntimeException("session property not recognised: " + sessionType); } - public void connect() + public boolean connect() { String url = "amqp://guest:guest@" + _client + "/test?brokerlist='" + _broker + "'&maxprefetch='" + _maxPrefetch + "'&maxdeliverycount='" + _maxRedelivery + "'"; System.setProperty(ClientProperties.MAX_DELIVERY_RECORDS_PROP_NAME, Integer.toString(_maxRecords)); @@ -120,11 +128,12 @@ public abstract class Client implements Callable<Integer> System.exit(0); } }); + return true; } catch (Exception e) { _log.error("Unable to setup connection, client and producer on broker", e); - throw new RuntimeException(e); + return false; } } diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Receiver.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Receiver.java index 1eb3c47983..f38e2c0888 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Receiver.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Receiver.java @@ -11,11 +11,10 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; +import javax.jms.Session; import org.apache.qpid.perftests.dlq.test.PerformanceTest; - - public class Receiver extends Client { private MessageConsumer _consumer; @@ -23,11 +22,16 @@ public class Receiver extends Client private int _reject; private int _rejectCount; private Map<Integer, Integer> _rejected = new HashMap<Integer, Integer>(); + private int _receivedCount = 0; private static volatile boolean _stopped; private static CountDownLatch _finished; private static AtomicInteger _id; - private static AtomicInteger _received; + private static AtomicInteger _totalReceivedCount; + private static AtomicInteger _totalConsumedCount; + private static AtomicInteger _rejectedCount; + private static int _consumedCheck; + private static int _rejectedCheck; public Receiver(Properties props) { @@ -39,17 +43,25 @@ public class Receiver extends Client public static void reset() { _id = new AtomicInteger(0); - _received = new AtomicInteger(0); + _totalReceivedCount = new AtomicInteger(0); + _totalConsumedCount = new AtomicInteger(0); + _rejectedCount = new AtomicInteger(0); _finished = new CountDownLatch(1); _stopped = false; } + - public void start() throws Exception + public synchronized void start() throws Exception { _listener = Boolean.parseBoolean(_props.getProperty(LISTENER)); _reject = Integer.parseInt(_props.getProperty(REJECT)); _rejectCount = Integer.parseInt(_props.getProperty(REJECT_COUNT)); - + + boolean sessionOk = (_transacted || _clientAck) || + ((_sessionType == Session.AUTO_ACKNOWLEDGE || _sessionType == Session.DUPS_OK_ACKNOWLEDGE) && _listener); + _rejectedCheck = (!sessionOk || _messageIds || _maxRedelivery == 0 || _rejectCount < _maxRedelivery) ? 0 : _count / _reject; + _consumedCheck = (_count - _rejectedCheck); // + (sessionOk ? ((_count / _reject) * _rejectCount) : 0); + _consumer = _session.createConsumer(_queue); _connection.start(); @@ -71,7 +83,10 @@ public class Receiver extends Client while (!_stopped) { Message msg = _consumer.receive(1000); - processMessage(msg); + if (msg != null) + { + processMessage(msg); + } } } @@ -79,7 +94,7 @@ public class Receiver extends Client { try { - _received.incrementAndGet(); + _totalReceivedCount.incrementAndGet(); int number = msg.getIntProperty("number"); if (number % 100 == 0) { @@ -90,17 +105,24 @@ public class Receiver extends Client if (rejectMessage) { int rejectCount = 0; - if (_rejected.containsKey(number)) + if (!_rejected.containsKey(number)) { - rejectCount = _rejected.get(number); + _rejected.put(number, 0); } - _rejected.put(number, ++rejectCount); + rejectCount = _rejected.get(number) + 1; + _rejected.put(number, rejectCount); if (rejectCount <= _rejectCount) { - if (rejectCount >= _maxRedelivery) + if (rejectCount == _maxRedelivery) { - _log.info("rejecting message (" + rejectCount + ") " + msg.getJMSMessageID()); + _rejectedCount.incrementAndGet(); + _log.info("client " + _client + " rejecting message (" + rejectCount + ") " + msg.getJMSMessageID()); } + if (rejectCount > _maxRedelivery) + { + throw new RuntimeException("client " + _client + " received message " + msg.getJMSMessageID() + + " " + rejectCount + " times"); + } if (_transacted) { _session.rollback(); @@ -118,6 +140,8 @@ public class Receiver extends Client if (!rejectMessage) { + _receivedCount++; + _totalConsumedCount.incrementAndGet(); if (_transacted) { _session.commit(); @@ -126,12 +150,14 @@ public class Receiver extends Client { msg.acknowledge(); } - if (number == (_count - 1)) - { - _stopped = true; - _finished.countDown(); - } } + + if (_totalConsumedCount.get() >= _consumedCheck && _rejectedCount.get() >= _rejectedCheck) + { + _log.info("stopping receivers after " + _totalConsumedCount.get() + " received and " + _rejectedCount.get() + " rejected"); + _stopped = true; + _finished.countDown(); + } } catch (Exception e) { @@ -156,6 +182,21 @@ public class Receiver extends Client _finished.await(); PerformanceTest.countDown(); - return _received.get(); + return _receivedCount; + } + + public static int getTotalReceivedCount() + { + return _totalReceivedCount.get(); + } + + public static int getConsumedCheck() + { + return _consumedCheck; + } + + public static int getRejectedCheck() + { + return _rejectedCheck; } }
\ No newline at end of file diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceFramework.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceFramework.java index 3db257362c..0f29c1f5bf 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceFramework.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceFramework.java @@ -8,7 +8,6 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.PrintStream; -import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Properties; @@ -16,6 +15,10 @@ import java.util.Properties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Run a series of different performance tests, based on a set of variations of + * configuration properties, and collect the results and generated statistics. + */ public class PerformanceFramework { private static final Logger _log = LoggerFactory.getLogger(PerformanceFramework.class); @@ -69,7 +72,10 @@ public class PerformanceFramework out.println(_id + "," + maxRedelivery + "," + rejectCount + "," + Boolean.toString(messageIds == 0) + "," + Boolean.toString(listener == 0) + "," + SESSION_VALUES[session]); - runOnce(_id); + if (!runOnce(_id)) + { + return; + } } } } @@ -77,21 +83,29 @@ public class PerformanceFramework } } - public void runOnce(int id) + public boolean runOnce(int id) { PerformanceStatistics stats = new PerformanceStatistics(_props); try { _log.info("starting test id " + id); String fileId = String.format("%04d", id); - stats.series(new File(_dir, fileId + "-series.csv")); - _log.info("test id " + id + " completed ok"); - stats.statistics(new File(_dir, fileId + "-statistics.csv")); + if (stats.series(new File(_dir, fileId + "-series.csv"))) + { + _log.info("test id " + id + " completed ok"); + stats.statistics(new File(_dir, fileId + "-statistics.csv")); + } + else + { + _log.error("connection failure, test series aborted"); + return false; + } } catch (Exception e) { _log.error("failed test id " + id + " with error", e); } + return true; } public static void main(String[] argv) throws Exception diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceStatistics.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceStatistics.java index a3f6a3fcba..7f009dbc08 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceStatistics.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceStatistics.java @@ -15,6 +15,10 @@ import java.util.Properties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Run a series of performance tests, based on specific configuration properties, and + * collect the results to generate statistics. + */ public class PerformanceStatistics { private static final Logger _log = LoggerFactory.getLogger(PerformanceStatistics.class); @@ -22,6 +26,7 @@ public class PerformanceStatistics private Properties _props; private List<Double> _sent = new ArrayList<Double>(); private List<Double> _received = new ArrayList<Double>(); + private List<Double> _consumed = new ArrayList<Double>(); private List<Double> _rejected = new ArrayList<Double>(); private List<Double> _duration = new ArrayList<Double>(); private List<Double> _throughputIn = new ArrayList<Double>(); @@ -50,23 +55,31 @@ public class PerformanceStatistics _props = props; } - public void single(PrintStream out) throws Exception + public boolean single(PrintStream out) throws Exception { - PerformanceTest client = new PerformanceTest(_props); - client.test(); - client.check(out); - _sent.add(client.getSent()); - _received.add(client.getReceived()); - _rejected.add(client.getRejected()); - _duration.add(client.getDuration()); - _throughputIn.add(client.getThroughputIn()); - _throughputOut.add(client.getThroughputOut()); - _bandwidthIn.add(client.getBandwidthIn()); - _bandwidthOut.add(client.getBandwidthOut()); - _latency.add(client.getLatency()); + PerformanceTest test = new PerformanceTest(_props); + if (test.test()) + { + test.check(out); + _sent.add(test.getSent()); + _received.add(test.getTotalReceived()); + _consumed.add(test.getConsumed()); + _rejected.add(test.getRejected()); + _duration.add(test.getDuration()); + _throughputIn.add(test.getThroughputIn()); + _throughputOut.add(test.getThroughputOut()); + _bandwidthIn.add(test.getBandwidthIn()); + _bandwidthOut.add(test.getBandwidthOut()); + _latency.add(test.getLatency()); + return true; + } + else + { + return false; + } } - public void series(File file) throws Exception + public boolean series(File file) throws Exception { try { @@ -76,7 +89,10 @@ public class PerformanceStatistics for (int i = 0; i < repeat; i++) { _log.info("starting individual test run " + i); - single(out); + if (!single(out)) + { + return false; + } } } catch (Exception e) @@ -86,6 +102,7 @@ public class PerformanceStatistics _statistics.add(new Statistics(_sent, "sent")); _statistics.add(new Statistics(_received, "received")); + _statistics.add(new Statistics(_consumed, "consumed")); _statistics.add(new Statistics(_rejected, "rejected")); _statistics.add(new Statistics(_duration, "duration")); _statistics.add(new Statistics(_throughputIn, "throughputIn")); @@ -93,6 +110,7 @@ public class PerformanceStatistics _statistics.add(new Statistics(_bandwidthIn, "bandwidthIn")); _statistics.add(new Statistics(_bandwidthOut, "bandwidthOut")); _statistics.add(new Statistics(_latency, "latency")); + return true; } public void statistics(File file) @@ -126,7 +144,13 @@ public class PerformanceStatistics } PerformanceStatistics stats = new PerformanceStatistics(propertyFile); - stats.series(new File("series.csv")); - stats.statistics(new File("statistics.csv")); + if (stats.series(new File("series.csv"))) + { + stats.statistics(new File("statistics.csv")); + } + else + { + System.err.println("connection faulre, test series aborted"); + } } } diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java index a8c7e7a905..a3be10a697 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java @@ -16,13 +16,20 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.qpid.perftests.dlq.client.Check; +import org.apache.qpid.perftests.dlq.client.Client; import org.apache.qpid.perftests.dlq.client.Create; import org.apache.qpid.perftests.dlq.client.Receiver; import org.apache.qpid.perftests.dlq.client.Sender; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; - +/** + * Run a single performance test, based on specific configuration properties. + */ public class PerformanceTest { + private static final Logger _log = LoggerFactory.getLogger(PerformanceFramework.class); + private static CountDownLatch _latch; private ExecutorService _executor; @@ -30,7 +37,7 @@ public class PerformanceTest private int _size = 0; private int _threads = 0; private int _sent = 0; - private int _received = 0; + private int _consumed = 0; private int _rejected = 0; private long _started = 0; private long _finished = 0; @@ -62,10 +69,14 @@ public class PerformanceTest _props = props; } - public void test() throws Exception + public boolean test() throws Exception { - Create create = new Create(_props); - create.connect(); + Client create = new Create(_props); + if (!create.connect()) + { + _log.error("initial connection failed"); + return false; + } create.call(); create.shutdown(); @@ -82,16 +93,16 @@ public class PerformanceTest _latch = new CountDownLatch(1); _started = System.nanoTime(); - Sender sender = new Sender(_props); + Client sender = new Sender(_props); sender.connect(); Future<Integer> send = _executor.submit(sender); Receiver.reset(); List<Future<Integer>> receives = new ArrayList<Future<Integer>>(); - List<Receiver> receivers = new ArrayList<Receiver>(); + List<Client> receivers = new ArrayList<Client>(); for (int i = 0; i < _threads; i++) { - Receiver receiver = new Receiver(_props); + Client receiver = new Receiver(_props); receiver.connect(); receivers.add(receiver); receives.add(_executor.submit(receiver)); @@ -104,10 +115,10 @@ public class PerformanceTest _sent = send.get(); for (Future<Integer> receive : receives) { - _received += receive.get(); + _consumed += receive.get(); } - Check check = new Check(_props); + Client check = new Check(_props); check.connect(); _rejected = check.call(); check.shutdown(); @@ -119,12 +130,13 @@ public class PerformanceTest finally { sender.shutdown(); - for (Receiver receiver : receivers) + for (Client receiver : receivers) { receiver.shutdown(); } _executor.shutdownNow(); } + return true; } public void check(PrintStream out) @@ -134,17 +146,13 @@ public class PerformanceTest { error.append("sent ").append(_sent).append(" not ").append(_count).append('\n'); } - boolean sessionOk = ((_session.equalsIgnoreCase(CLIENT_ACKNOWLEDGE)) || (_session.equalsIgnoreCase(SESSION_TRANSACTED)) || - ((_session.equalsIgnoreCase(AUTO_ACKNOWLEDGE) || _session.equalsIgnoreCase(DUPS_OK_ACKNOWLEDGE)) && _listener)); - int rejected = (!sessionOk || !_messageIds || _maxRedelivery == 0 || _rejectCount < _maxRedelivery) ? 0 : _count / _reject; - if (_rejected != rejected) + if (_rejected != Receiver.getRejectedCheck()) { - error.append("rejected ").append(_rejected).append(" not ").append(rejected).append('\n'); + error.append("rejected ").append(_rejected).append(" not ").append(Receiver.getRejectedCheck()).append('\n'); } - int received = (_count - rejected) + (sessionOk ? ((_count / _reject) * _rejectCount) : 0); - if (_received != received) + if (_consumed != Receiver.getConsumedCheck()) { - error.append("received ").append(_received).append(" not ").append(received).append('\n'); + error.append("consumed ").append(_consumed).append(" not ").append(Receiver.getConsumedCheck()).append('\n'); } if (error.length() > 0) { @@ -158,12 +166,12 @@ public class PerformanceTest public static String getHeader() { - return "sent,received,rejected,duration"; + return "sent,received,consumed,rejected,duration"; } public String toString() { - String results = String.format("%d,%d,%d,%f", _sent, _received, _rejected, getDuration()); + String results = String.format("%d,%d,%d,%d,%f", _sent, Receiver.getTotalReceivedCount(), _consumed, _rejected, getDuration()); return results; } @@ -172,9 +180,14 @@ public class PerformanceTest return (double) _sent; } - public double getReceived() + public double getConsumed() + { + return (double) _consumed; + } + + public double getTotalReceived() { - return (double) _received; + return (double) Receiver.getTotalReceivedCount(); } public double getDuration() @@ -194,7 +207,7 @@ public class PerformanceTest public double getThroughputOut() { - return getReceived() / getDuration(); + return getTotalReceived() / getDuration(); } public double getBandwidthIn() @@ -204,12 +217,12 @@ public class PerformanceTest public double getBandwidthOut() { - return (getReceived() * (double) _size) / getDuration(); + return (getTotalReceived() * (double) _size) / getDuration(); } public double getLatency() { - return getDuration() / getReceived(); + return getDuration() / getTotalReceived(); } public static void countDown() @@ -230,8 +243,14 @@ public class PerformanceTest throw new RuntimeException("property file '" + propertyFile.getName() + "' must exist and be readable"); } PerformanceTest client = new PerformanceTest(propertyFile); - client.test(); - client.check(System.out); + if (client.test()) + { + client.check(System.out); + } + else + { + System.err.println("test connection failure"); + } } } |