summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-02-02 15:09:08 +0000
committerRobert Greig <rgreig@apache.org>2007-02-02 15:09:08 +0000
commit3393bb42ebc8f802988f98848e64b1bec508c5bb (patch)
tree853ed3f15e4c311a97d8ef3781211d0ca6f42440
parent6aef63d9078abd7f96b79e70ede242fa7056da37 (diff)
downloadqpid-python-3393bb42ebc8f802988f98848e64b1bec508c5bb.tar.gz
(Submitted by Rupert Smith)
Perftests improved with better timeout handling. Shared/unique destinations to ping now an option. TestRunner now runs all per-thread setups, synchs all threads, then runs tests, synchas all threads, then runs tear downs. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@502620 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.jarbin69140 -> 0 bytes
-rw-r--r--java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.jar.md51
-rw-r--r--java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.jar.sha11
-rw-r--r--java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.pom.md51
-rw-r--r--java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.pom.sha11
-rw-r--r--java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jarbin0 -> 73370 bytes
-rw-r--r--java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar.md51
-rw-r--r--java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar.sha11
-rw-r--r--java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom (renamed from java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.pom)2
-rw-r--r--java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom.md51
-rw-r--r--java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom.sha11
-rw-r--r--java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml4
-rw-r--r--java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.md52
-rw-r--r--java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.sha12
-rw-r--r--java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml2
-rw-r--r--java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.md52
-rw-r--r--java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.sha12
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java7
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java118
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java6
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java87
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java4
22 files changed, 157 insertions, 89 deletions
diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.jar b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.jar
deleted file mode 100644
index d05e42daa0..0000000000
--- a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.jar
+++ /dev/null
Binary files differ
diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.jar.md5 b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.jar.md5
deleted file mode 100644
index fe9db298fb..0000000000
--- a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.jar.md5
+++ /dev/null
@@ -1 +0,0 @@
-b54c1911c914f460a9a816cadd64c787 \ No newline at end of file
diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.jar.sha1 b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.jar.sha1
deleted file mode 100644
index 6d93d0b731..0000000000
--- a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.jar.sha1
+++ /dev/null
@@ -1 +0,0 @@
-f150a7450eca6303c94fdad42b59114d6a7d6708 \ No newline at end of file
diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.pom.md5 b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.pom.md5
deleted file mode 100644
index 73d40e036e..0000000000
--- a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.pom.md5
+++ /dev/null
@@ -1 +0,0 @@
-7f48b1816a77acb427ad5dcb2b1eb5bb \ No newline at end of file
diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.pom.sha1 b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.pom.sha1
deleted file mode 100644
index dec86a3286..0000000000
--- a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.pom.sha1
+++ /dev/null
@@ -1 +0,0 @@
-5fece706dc17a5d30acad3135ea67f6b9229a6d4 \ No newline at end of file
diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar
new file mode 100644
index 0000000000..b5c0129ea9
--- /dev/null
+++ b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar
Binary files differ
diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar.md5 b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar.md5
new file mode 100644
index 0000000000..2f9b7922bd
--- /dev/null
+++ b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar.md5
@@ -0,0 +1 @@
+ce27581a94a89b664830f3c355dd7bf5 \ No newline at end of file
diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar.sha1 b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar.sha1
new file mode 100644
index 0000000000..3c17b5a8e4
--- /dev/null
+++ b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar.sha1
@@ -0,0 +1 @@
+ffecddfd23345c7fb4177ab1d89cf73a4fe7adc9 \ No newline at end of file
diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.pom b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom
index 7fb4254661..a2f72deff6 100644
--- a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.pom
+++ b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom
@@ -3,7 +3,7 @@
<groupId>uk.co.thebadgerset</groupId>
<artifactId>junit-toolkit</artifactId>
<name>junit-toolkit</name>
- <version>0.5-20070131.112634-1</version>
+ <version>0.5-20070202.132554-1</version>
<description>JUnit Toolkit enhances JUnit with performance testing, asymptotic behaviour analysis, and concurrency testing.</description>
<url>http://www.thebadgerset.co.uk/junit-toolkit</url>
<developers>
diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom.md5 b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom.md5
new file mode 100644
index 0000000000..ec5938d266
--- /dev/null
+++ b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom.md5
@@ -0,0 +1 @@
+36d35e778356cef8a984a021d9bc0fe4 \ No newline at end of file
diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom.sha1 b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom.sha1
new file mode 100644
index 0000000000..f889d5fda6
--- /dev/null
+++ b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom.sha1
@@ -0,0 +1 @@
+9383e1d89168d83973f47595063a35b733a3854d \ No newline at end of file
diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml
index 216800cda7..42bb1afc36 100644
--- a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml
+++ b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml
@@ -4,9 +4,9 @@
<version>0.5-SNAPSHOT</version>
<versioning>
<snapshot>
- <timestamp>20070131.112634</timestamp>
+ <timestamp>20070202.132554</timestamp>
<buildNumber>1</buildNumber>
</snapshot>
- <lastUpdated>20070131112634</lastUpdated>
+ <lastUpdated>20070202132554</lastUpdated>
</versioning>
</metadata> \ No newline at end of file
diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.md5 b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.md5
index c355e8f9be..2d29a790e5 100644
--- a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.md5
+++ b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.md5
@@ -1 +1 @@
-59aa948e74a22a370cc414718864f65f \ No newline at end of file
+277e07c561ec6eebda9d1a470a2ce6a4 \ No newline at end of file
diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.sha1 b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.sha1
index 5d9fbad25f..0b047b645a 100644
--- a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.sha1
+++ b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.sha1
@@ -1 +1 @@
-f2350021d6d3c2091d35d074b6ecd312d6f91657 \ No newline at end of file
+155e5e47c236cefb5668414e81cc485867bdb93e \ No newline at end of file
diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml
index 4ee3d61cca..76b0c44645 100644
--- a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml
+++ b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml
@@ -6,6 +6,6 @@
<versions>
<version>0.5-SNAPSHOT</version>
</versions>
- <lastUpdated>20070131112634</lastUpdated>
+ <lastUpdated>20070202132554</lastUpdated>
</versioning>
</metadata> \ No newline at end of file
diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.md5 b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.md5
index 39d7b84744..a57795d6c0 100644
--- a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.md5
+++ b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.md5
@@ -1 +1 @@
-260c92993eec99c2c2c0217cf6241842 \ No newline at end of file
+77e2a30606515c4f52bcb466c3966f63 \ No newline at end of file
diff --git a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.sha1 b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.sha1
index bc3857fa21..6b612f5d27 100644
--- a/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.sha1
+++ b/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.sha1
@@ -1 +1 @@
-c8803f171049b2bb3e258f78b8c9490dec3074c4 \ No newline at end of file
+111f4320859e398337dbf4e396ec2fad1e5aa8dd \ No newline at end of file
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
index 7fd91ca39d..0e3d753fea 100644
--- a/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
+++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
@@ -63,18 +63,19 @@ public class PingClient extends PingPongProducer
* @param noOfDestinations The number of destinations to ping. Must be 1 or more.
* @param rate Specified the number of pings per second to send. Setting this to 0 means send as fast as
* possible, with no rate restriction.
- * @param pubsub
+ * @param pubsub True to ping topics, false to ping queues.
+ * @param unique True to use unique destinations for each ping pong producer, false to share.
*
* @throws Exception Any exceptions are allowed to fall through.
*/
public PingClient(String brokerDetails, String username, String password, String virtualpath, String destinationName,
String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose,
boolean afterCommit, boolean beforeCommit, boolean afterSend, boolean beforeSend, boolean failOnce,
- int txBatchSize, int noOfDestinations, int rate, boolean pubsub) throws Exception
+ int txBatchSize, int noOfDestinations, int rate, boolean pubsub, boolean unique) throws Exception
{
super(brokerDetails, username, password, virtualpath, destinationName, selector, transacted, persistent, messageSize,
verbose, afterCommit, beforeCommit, afterSend, beforeSend, failOnce, txBatchSize, noOfDestinations, rate,
- pubsub);
+ pubsub, unique);
}
/**
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
index ce64a40217..75e49d9e36 100644
--- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
@@ -34,13 +34,12 @@ import javax.jms.*;
import org.apache.log4j.Logger;
-
-import org.apache.qpid.client.message.TestMessageFactory;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.message.TestMessageFactory;
import org.apache.qpid.jms.MessageProducer;
import org.apache.qpid.jms.Session;
import org.apache.qpid.topic.Config;
@@ -78,11 +77,9 @@ import uk.co.thebadgerset.junit.extensions.Throttle;
* the timing controller on timing aware tests or by throttling rate of calling tests methods on non-timing aware
* tests.
*
- * @todo Make shared or unique destinations a configurable option, hard coded to false.
- *
* @todo Make acknowledege mode a test option.
*
- * @todo Make the message listener a static for all replies to be sent to. It won't be any more of a bottle neck than
+ * @todo Make the message listener a static for all replies to be sent to? It won't be any more of a bottle neck than
* having one per PingPongProducer, as will synchronize on message correlation id, allowing threads to process
* messages concurrently for different ids. Needs to be static so that when using a chained message listener and
* shared destinations between multiple PPPs, it gets notified about all replies, not just those that happen to
@@ -94,12 +91,6 @@ import uk.co.thebadgerset.junit.extensions.Throttle;
* that the last message waits until all other messages have been handled before releasing producers but allows
* messages to be processed concurrently, unlike the current synchronized block.
*
- * @todo Set the timeout to be per message correlation id. Restart it every time a message is received (with matching id).
- * Means that timeout is measuring situations whether a particular ping stream has pasued for too long, rather than
- * the time to send an entire block of messages. This will be better because the timeout won't need to be adjusted
- * depending on the total number of messages being sent. Logic to be added to sendAndWait to recheck the timeout
- * whenever its wait expires.
- *
* @todo Need to multiply up the number of expected messages for pubsub tests as each can be received by many consumers?
*/
public class PingPongProducer implements Runnable, MessageListener, ExceptionListener
@@ -155,6 +146,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
public static final String COMMIT_BATCH_SIZE_PROPNAME = "CommitBatchSize";
+ public static final String UNIQUE_PROPNAME = "uniqueDests";
+
/** Used to set up a default message size. */
public static final int DEFAULT_MESSAGE_SIZE = 0;
@@ -171,7 +164,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
public static final long DEFAULT_SLEEP_TIME = 250;
/** Default time to wait before assuming that a ping has timed out. */
- public static final long DEFAULT_TIMEOUT = 9000;
+ public static final long DEFAULT_TIMEOUT = 30000;
/** Defines the default number of pings to send in each transaction when running transactionally. */
public static final int DEFAULT_TX_BATCH_SIZE = 100;
@@ -227,18 +220,25 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
/** Holds the default verbose mode. */
public static final boolean DEFAULT_VERBOSE = false;
+ public static final boolean DEFAULT_UNIQUE = true;
+
/** Holds the name of the property to store nanosecond timestamps in ping messages with. */
public static final String MESSAGE_TIMESTAMP_PROPNAME = "timestamp";
/** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */
- private static AtomicLong idGenerator = new AtomicLong(0L);
+ private static AtomicLong _correlationIdGenerator = new AtomicLong(0L);
+
+ /** A source for providing unique ids to PingPongProducer. */
+ private static AtomicInteger _pingProducerIdGenerator;
/**
* Holds a map from message ids to latches on which threads wait for replies. This map is shared accross
* multiple ping producers on the same JVM.
*/
- private static Map<String, CountDownLatch> trafficLights =
- Collections.synchronizedMap(new HashMap<String, CountDownLatch>());
+ /*private static Map<String, CountDownLatch> trafficLights =
+ Collections.synchronizedMap(new HashMap<String, CountDownLatch>());*/
+ private static Map<String, PerCorrelationId> perCorrelationIds =
+ Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
/** A convenient formatter to use when time stamping output. */
protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
@@ -337,7 +337,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
* @param noOfDestinations The number of destinations to ping. Must be 1 or more.
* @param rate Specified the number of pings per second to send. Setting this to 0 means send as fast as
* possible, with no rate restriction.
- * @param pubsub
+ * @param pubsub True to ping topics, false to ping queues.
+ * @param unique True to use unique destinations for each ping pong producer, false to share.
*
* @throws Exception Any exceptions are allowed to fall through.
*/
@@ -345,8 +346,18 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
String destinationName, String selector, boolean transacted, boolean persistent, int messageSize,
boolean verbose, boolean afterCommit, boolean beforeCommit, boolean afterSend,
boolean beforeSend, boolean failOnce, int txBatchSize, int noOfDestinations, int rate,
- boolean pubsub) throws Exception
+ boolean pubsub, boolean unique) throws Exception
{
+ _logger.debug("public PingPongProducer(String brokerDetails = " + brokerDetails + ", String username = " + username
+ + ", String password = " + password + ", String virtualpath = " + virtualpath
+ + ", String destinationName = " + destinationName + ", String selector = " + selector
+ + ", boolean transacted = " + transacted + ", boolean persistent = " + persistent
+ + ", int messageSize = " + messageSize + ", boolean verbose = " + verbose + ", boolean afterCommit = "
+ + afterCommit + ", boolean beforeCommit = " + beforeCommit + ", boolean afterSend = " + afterSend
+ + ", boolean beforeSend = " + beforeSend + ", boolean failOnce = " + failOnce + ", int txBatchSize = "
+ + txBatchSize + ", int noOfDestinations = " + noOfDestinations + ", int rate = " + rate
+ + ", boolean pubsub = " + pubsub + ", boolean unique = " + unique + "): called");
+
// Check that one or more destinations were specified.
if (noOfDestinations < 1)
{
@@ -375,7 +386,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
// Create the producer and the consumers for all reply destinations.
createProducer();
- createPingDestinations(noOfDestinations, selector, destinationName, true);
+ createPingDestinations(noOfDestinations, selector, destinationName, unique);
createReplyConsumers(getReplyDestinations(), selector);
// Keep all the remaining options.
@@ -470,7 +481,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
PingPongProducer pingProducer =
new PingPongProducer(brokerDetails, DEFAULT_USERNAME, DEFAULT_PASSWORD, virtualpath, destName, selector,
transacted, persistent, messageSize, verbose, afterCommit, beforeCommit, afterSend,
- beforeSend, failOnce, batchSize, destCount, rate, pubsub);
+ beforeSend, failOnce, batchSize, destCount, rate, pubsub, false);
pingProducer.getConnection().start();
@@ -605,10 +616,15 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
_logger.debug("correlationID = " + correlationID);
// Countdown on the traffic light if there is one for the matching correlation id.
- CountDownLatch trafficLight = trafficLights.get(correlationID);
+ PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID);
- if (trafficLight != null)
+ if (perCorrelationId != null)
{
+ CountDownLatch trafficLight = perCorrelationId.trafficLight;
+
+ // Restart the timeout timer on every message.
+ perCorrelationId.timeOutStart = System.nanoTime();
+
_logger.debug("Reply was expected, decrementing the latch for the id, " + correlationID);
// Decrement the countdown latch. Before this point, it is possible that two threads might enter this
@@ -650,7 +666,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
}
else
{
- _logger.debug("There was no thread waiting for reply: " + correlationID);
+ _logger.warn("Got unexpected message with correlationId: " + correlationID);
}
// Print out ping times for every message in verbose mode only.
@@ -693,7 +709,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
+ timeout + "): called");
// Create a unique correlation id to put on the messages before sending them.
- String messageCorrelationId = Long.toString(idGenerator.incrementAndGet());
+ String messageCorrelationId = Long.toString(_correlationIdGenerator.incrementAndGet());
return pingAndWaitForReply(message, numPings, timeout, messageCorrelationId);
}
@@ -726,17 +742,42 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
// One is added to this, so that the last reply becomes a special case. The special case is that the
// chained message listener must be called before this sender can be unblocked, but that decrementing the
// countdown needs to be done before the chained listener can be called.
- CountDownLatch trafficLight = new CountDownLatch(numPings + 1);
- trafficLights.put(messageCorrelationId, trafficLight);
+ PerCorrelationId perCorrelationId = new PerCorrelationId();
+ perCorrelationId.trafficLight = new CountDownLatch(numPings + 1);
+ perCorrelationIds.put(messageCorrelationId, perCorrelationId);
+
+ // Set up the current time as the start time for pinging on the correlation id. This is used to determine
+ // timeouts.
+ perCorrelationId.timeOutStart = System.nanoTime();
// Send the specifed number of messages.
pingNoWaitForReply(message, numPings, messageCorrelationId);
- // Block the current thread until replies to all the message are received, or it times out.
- trafficLight.await(timeout, TimeUnit.MILLISECONDS);
+ boolean timedOut = false;
+ boolean allMessagesReceived = false;
+ int numReplies = 0;
+
+ do
+ {
+ // Block the current thread until replies to all the messages are received, or it times out.
+ perCorrelationId.trafficLight.await(timeout, TimeUnit.MILLISECONDS);
+
+ // Work out how many replies were receieved.
+ numReplies = numPings - (int) perCorrelationId.trafficLight.getCount();
+ allMessagesReceived = numReplies >= numPings;
+
+ _logger.debug("numReplies = "+ numReplies);
+ _logger.debug("allMessagesReceived = "+ allMessagesReceived);
+
+ // Recheck the timeout condition.
+ long now = System.nanoTime();
+ long lastMessageReceievedAt = perCorrelationId.timeOutStart;
+ timedOut = (now - lastMessageReceievedAt) > (timeout * 1000000);
- // Work out how many replies were receieved.
- int numReplies = numPings - (int) trafficLight.getCount();
+ _logger.debug("now = " + now);
+ _logger.debug("lastMessageReceievedAt = " + lastMessageReceievedAt);
+ }
+ while (!timedOut && !allMessagesReceived);
if ((numReplies < numPings) && _verbose)
{
@@ -757,7 +798,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
// so will be a memory leak if this is not done.
finally
{
- trafficLights.remove(messageCorrelationId);
+ perCorrelationIds.remove(messageCorrelationId);
}
}
@@ -1148,4 +1189,17 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
{
public void onMessage(Message message, int remainingCount) throws JMSException;
}
+
+ /**
+ * Holds information on each correlation id. The countdown latch, the current timeout timer... More stuff to be
+ * added to this: read/write lock to make onMessage more concurrent as described in class header comment.
+ */
+ protected static class PerCorrelationId
+ {
+ /** Holds a countdown on number of expected messages. */
+ CountDownLatch trafficLight;
+
+ /** Holds the last timestamp that the timeout was reset to. */
+ Long timeOutStart;
+ }
}
diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
index 27de27faf3..f61516fc5a 100644
--- a/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
+++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
@@ -250,14 +250,16 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA
*/
public void onMessage(Message message, int remainingCount) throws JMSException
{
- _logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount + "): called");
-
// Check if a batch boundary has been crossed.
if ((remainingCount % _batchSize) == 0)
{
// Extract the correlation id from the message.
String correlationId = message.getJMSCorrelationID();
+ _logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount +
+ "): called on batch boundary for message id: "
+ + correlationId + " with thread id: " + Thread.currentThread().getId());
+
// Get the details for the correlation id and check that they are not null. They can become null
// if a test times out.
PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationId);
diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
index c4e72f4bb6..0d0df0128e 100644
--- a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
+++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
@@ -72,41 +72,40 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware
super(name);
// Sets up the test parameters with defaults.
- ParsedProperties.setSysPropertyIfNull(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME,
- Integer.toString(PingPongProducer.DEFAULT_TX_BATCH_SIZE));
- ParsedProperties.setSysPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME,
- Integer.toString(PingPongProducer.DEFAULT_MESSAGE_SIZE));
- ParsedProperties.setSysPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME,
- PingPongProducer.DEFAULT_PING_DESTINATION_NAME);
- ParsedProperties.setSysPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME,
- Boolean.toString(PingPongProducer.DEFAULT_PERSISTENT_MODE));
- ParsedProperties.setSysPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME,
- Boolean.toString(PingPongProducer.DEFAULT_TRANSACTED));
- ParsedProperties.setSysPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.DEFAULT_BROKER);
- ParsedProperties.setSysPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.DEFAULT_USERNAME);
- ParsedProperties.setSysPropertyIfNull(PingPongProducer.PASSWORD_PROPNAME, PingPongProducer.DEFAULT_PASSWORD);
- ParsedProperties.setSysPropertyIfNull(PingPongProducer.VIRTUAL_PATH_PROPNAME, PingPongProducer.DEFAULT_VIRTUAL_PATH);
- ParsedProperties.setSysPropertyIfNull(PingPongProducer.VERBOSE_OUTPUT_PROPNAME,
- Boolean.toString(PingPongProducer.DEFAULT_VERBOSE));
- ParsedProperties.setSysPropertyIfNull(PingPongProducer.RATE_PROPNAME,
- Integer.toString(PingPongProducer.DEFAULT_RATE));
- ParsedProperties.setSysPropertyIfNull(PingPongProducer.IS_PUBSUB_PROPNAME,
- Boolean.toString(PingPongProducer.DEFAULT_PUBSUB));
- ParsedProperties.setSysPropertyIfNull(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME,
- Integer.toString(PingPongProducer.DEFAULT_TX_BATCH_SIZE));
- ParsedProperties.setSysPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME,
- Long.toString(PingPongProducer.DEFAULT_TIMEOUT));
- ParsedProperties.setSysPropertyIfNull(PingPongProducer.PING_DESTINATION_COUNT_PROPNAME,
- Integer.toString(PingPongProducer.DEFAULT_DESTINATION_COUNT));
- ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME,
- PingPongProducer.DEFAULT_FAIL_AFTER_COMMIT);
- ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME,
- PingPongProducer.DEFAULT_FAIL_BEFORE_COMMIT);
- ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME,
- PingPongProducer.DEFAULT_FAIL_AFTER_SEND);
- ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME,
- PingPongProducer.DEFAULT_FAIL_BEFORE_SEND);
- ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.DEFAULT_FAIL_ONCE);
+ testParameters.setPropertyIfNull(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME,
+ Integer.toString(PingPongProducer.DEFAULT_TX_BATCH_SIZE));
+ testParameters.setPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME,
+ Integer.toString(PingPongProducer.DEFAULT_MESSAGE_SIZE));
+ testParameters.setPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME,
+ PingPongProducer.DEFAULT_PING_DESTINATION_NAME);
+ testParameters.setPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME,
+ Boolean.toString(PingPongProducer.DEFAULT_PERSISTENT_MODE));
+ testParameters.setPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME,
+ Boolean.toString(PingPongProducer.DEFAULT_TRANSACTED));
+ testParameters.setPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.DEFAULT_BROKER);
+ testParameters.setPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.DEFAULT_USERNAME);
+ testParameters.setPropertyIfNull(PingPongProducer.PASSWORD_PROPNAME, PingPongProducer.DEFAULT_PASSWORD);
+ testParameters.setPropertyIfNull(PingPongProducer.VIRTUAL_PATH_PROPNAME, PingPongProducer.DEFAULT_VIRTUAL_PATH);
+ testParameters.setPropertyIfNull(PingPongProducer.VERBOSE_OUTPUT_PROPNAME,
+ Boolean.toString(PingPongProducer.DEFAULT_VERBOSE));
+ testParameters.setPropertyIfNull(PingPongProducer.RATE_PROPNAME, Integer.toString(PingPongProducer.DEFAULT_RATE));
+ testParameters.setPropertyIfNull(PingPongProducer.IS_PUBSUB_PROPNAME,
+ Boolean.toString(PingPongProducer.DEFAULT_PUBSUB));
+ testParameters.setPropertyIfNull(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME,
+ Integer.toString(PingPongProducer.DEFAULT_TX_BATCH_SIZE));
+ testParameters.setPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME, Long.toString(PingPongProducer.DEFAULT_TIMEOUT));
+ testParameters.setPropertyIfNull(PingPongProducer.PING_DESTINATION_COUNT_PROPNAME,
+ Integer.toString(PingPongProducer.DEFAULT_DESTINATION_COUNT));
+ testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME,
+ PingPongProducer.DEFAULT_FAIL_AFTER_COMMIT);
+ testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME,
+ PingPongProducer.DEFAULT_FAIL_BEFORE_COMMIT);
+ testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME,
+ PingPongProducer.DEFAULT_FAIL_AFTER_SEND);
+ testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME,
+ PingPongProducer.DEFAULT_FAIL_BEFORE_SEND);
+ testParameters.setPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.DEFAULT_FAIL_ONCE);
+ testParameters.setPropertyIfNull(PingPongProducer.UNIQUE_PROPNAME, PingPongProducer.DEFAULT_UNIQUE);
}
/**
@@ -125,11 +124,17 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware
public void testPingOk(int numPings) throws Exception
{
+ if (numPings == 0)
+ {
+ Assert.fail("Number of pings requested was zero.");
+ }
+
// Get the per thread test setup to run the test through.
PerThreadSetup perThreadSetup = threadSetup.get();
- if (numPings == 0)
+
+ if (perThreadSetup == null)
{
- _logger.error("Number of pings requested was zero.");
+ Assert.fail("Could not get per thread test setup, it was null.");
}
// Generate a sample message. This message is already time stamped and has its reply-to destination set.
@@ -182,6 +187,7 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware
boolean failBeforeSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME);
int batchSize = testParameters.getPropertyAsInteger(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME);
Boolean failOnce = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_ONCE_PROPNAME);
+ boolean unique = testParameters.getPropertyAsBoolean(PingPongProducer.UNIQUE_PROPNAME);
// Extract the test set up paramaeters.
int destinationscount =
@@ -195,7 +201,7 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware
perThreadSetup._pingClient = new PingClient(brokerDetails, username, password, virtualPath, destinationName,
selector, transacted, persistent, messageSize, verbose,
failAfterCommit, failBeforeCommit, failAfterSend, failBeforeSend,
- failOnce, batchSize, destinationscount, rate, pubsub);
+ failOnce, batchSize, destinationscount, rate, pubsub, unique);
}
// Start the client connection
perThreadSetup._pingClient.getConnection().start();
@@ -224,7 +230,10 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware
// Close the pingers so that it cleans up its connection cleanly.
synchronized (this)
{
- perThreadSetup._pingClient.close();
+ if ((perThreadSetup != null) && (perThreadSetup._pingClient != null))
+ {
+ perThreadSetup._pingClient.close();
+ }
}
// Ensure the per thread fixture is reclaimed.
diff --git a/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
index 81967d332a..3a89b1044e 100644
--- a/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
+++ b/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
@@ -110,6 +110,7 @@ public class PingPongTestPerf extends AsymptoticTestCase
ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME,
PingPongProducer.DEFAULT_FAIL_BEFORE_SEND);
ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.DEFAULT_FAIL_ONCE);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.UNIQUE_PROPNAME, Boolean.toString(PingPongProducer.DEFAULT_UNIQUE));
}
/**
@@ -186,6 +187,7 @@ public class PingPongTestPerf extends AsymptoticTestCase
boolean failBeforeSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME);
int batchSize = testParameters.getPropertyAsInteger(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME);
Boolean failOnce = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_ONCE_PROPNAME);
+ boolean unique = testParameters.getPropertyAsBoolean(PingPongProducer.UNIQUE_PROPNAME);
synchronized (this)
{
@@ -203,7 +205,7 @@ public class PingPongTestPerf extends AsymptoticTestCase
destinationName, selector, transacted, persistent,
messageSize, verbose, failAfterCommit,
failBeforeCommit, failAfterSend, failBeforeSend,
- failOnce, batchSize, 0, rate, pubsub);
+ failOnce, batchSize, 0, rate, pubsub, unique);
perThreadSetup._testPingProducer.getConnection().start();
}