summaryrefslogtreecommitdiff
path: root/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java')
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java75
1 files changed, 47 insertions, 28 deletions
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java
index a8c7e7a905..a3be10a697 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java
@@ -16,13 +16,20 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.qpid.perftests.dlq.client.Check;
+import org.apache.qpid.perftests.dlq.client.Client;
import org.apache.qpid.perftests.dlq.client.Create;
import org.apache.qpid.perftests.dlq.client.Receiver;
import org.apache.qpid.perftests.dlq.client.Sender;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-
+/**
+ * Run a single performance test, based on specific configuration properties.
+ */
public class PerformanceTest
{
+ private static final Logger _log = LoggerFactory.getLogger(PerformanceFramework.class);
+
private static CountDownLatch _latch;
private ExecutorService _executor;
@@ -30,7 +37,7 @@ public class PerformanceTest
private int _size = 0;
private int _threads = 0;
private int _sent = 0;
- private int _received = 0;
+ private int _consumed = 0;
private int _rejected = 0;
private long _started = 0;
private long _finished = 0;
@@ -62,10 +69,14 @@ public class PerformanceTest
_props = props;
}
- public void test() throws Exception
+ public boolean test() throws Exception
{
- Create create = new Create(_props);
- create.connect();
+ Client create = new Create(_props);
+ if (!create.connect())
+ {
+ _log.error("initial connection failed");
+ return false;
+ }
create.call();
create.shutdown();
@@ -82,16 +93,16 @@ public class PerformanceTest
_latch = new CountDownLatch(1);
_started = System.nanoTime();
- Sender sender = new Sender(_props);
+ Client sender = new Sender(_props);
sender.connect();
Future<Integer> send = _executor.submit(sender);
Receiver.reset();
List<Future<Integer>> receives = new ArrayList<Future<Integer>>();
- List<Receiver> receivers = new ArrayList<Receiver>();
+ List<Client> receivers = new ArrayList<Client>();
for (int i = 0; i < _threads; i++)
{
- Receiver receiver = new Receiver(_props);
+ Client receiver = new Receiver(_props);
receiver.connect();
receivers.add(receiver);
receives.add(_executor.submit(receiver));
@@ -104,10 +115,10 @@ public class PerformanceTest
_sent = send.get();
for (Future<Integer> receive : receives)
{
- _received += receive.get();
+ _consumed += receive.get();
}
- Check check = new Check(_props);
+ Client check = new Check(_props);
check.connect();
_rejected = check.call();
check.shutdown();
@@ -119,12 +130,13 @@ public class PerformanceTest
finally
{
sender.shutdown();
- for (Receiver receiver : receivers)
+ for (Client receiver : receivers)
{
receiver.shutdown();
}
_executor.shutdownNow();
}
+ return true;
}
public void check(PrintStream out)
@@ -134,17 +146,13 @@ public class PerformanceTest
{
error.append("sent ").append(_sent).append(" not ").append(_count).append('\n');
}
- boolean sessionOk = ((_session.equalsIgnoreCase(CLIENT_ACKNOWLEDGE)) || (_session.equalsIgnoreCase(SESSION_TRANSACTED)) ||
- ((_session.equalsIgnoreCase(AUTO_ACKNOWLEDGE) || _session.equalsIgnoreCase(DUPS_OK_ACKNOWLEDGE)) && _listener));
- int rejected = (!sessionOk || !_messageIds || _maxRedelivery == 0 || _rejectCount < _maxRedelivery) ? 0 : _count / _reject;
- if (_rejected != rejected)
+ if (_rejected != Receiver.getRejectedCheck())
{
- error.append("rejected ").append(_rejected).append(" not ").append(rejected).append('\n');
+ error.append("rejected ").append(_rejected).append(" not ").append(Receiver.getRejectedCheck()).append('\n');
}
- int received = (_count - rejected) + (sessionOk ? ((_count / _reject) * _rejectCount) : 0);
- if (_received != received)
+ if (_consumed != Receiver.getConsumedCheck())
{
- error.append("received ").append(_received).append(" not ").append(received).append('\n');
+ error.append("consumed ").append(_consumed).append(" not ").append(Receiver.getConsumedCheck()).append('\n');
}
if (error.length() > 0)
{
@@ -158,12 +166,12 @@ public class PerformanceTest
public static String getHeader()
{
- return "sent,received,rejected,duration";
+ return "sent,received,consumed,rejected,duration";
}
public String toString()
{
- String results = String.format("%d,%d,%d,%f", _sent, _received, _rejected, getDuration());
+ String results = String.format("%d,%d,%d,%d,%f", _sent, Receiver.getTotalReceivedCount(), _consumed, _rejected, getDuration());
return results;
}
@@ -172,9 +180,14 @@ public class PerformanceTest
return (double) _sent;
}
- public double getReceived()
+ public double getConsumed()
+ {
+ return (double) _consumed;
+ }
+
+ public double getTotalReceived()
{
- return (double) _received;
+ return (double) Receiver.getTotalReceivedCount();
}
public double getDuration()
@@ -194,7 +207,7 @@ public class PerformanceTest
public double getThroughputOut()
{
- return getReceived() / getDuration();
+ return getTotalReceived() / getDuration();
}
public double getBandwidthIn()
@@ -204,12 +217,12 @@ public class PerformanceTest
public double getBandwidthOut()
{
- return (getReceived() * (double) _size) / getDuration();
+ return (getTotalReceived() * (double) _size) / getDuration();
}
public double getLatency()
{
- return getDuration() / getReceived();
+ return getDuration() / getTotalReceived();
}
public static void countDown()
@@ -230,8 +243,14 @@ public class PerformanceTest
throw new RuntimeException("property file '" + propertyFile.getName() + "' must exist and be readable");
}
PerformanceTest client = new PerformanceTest(propertyFile);
- client.test();
- client.check(System.out);
+ if (client.test())
+ {
+ client.check(System.out);
+ }
+ else
+ {
+ System.err.println("test connection failure");
+ }
}
}