summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRupert Smith <rupertlssmith@apache.org>2007-10-03 16:28:38 +0000
committerRupert Smith <rupertlssmith@apache.org>2007-10-03 16:28:38 +0000
commitdf0cc12065d26d953bb7bd7f603ee0a59cfe3939 (patch)
tree78b5d55be4758070cad6e7226447fba5bc976ba7
parent34b1fef825e94c6ee378206f92634f3843e75723 (diff)
downloadqpid-python-df0cc12065d26d953bb7bd7f603ee0a59cfe3939.tar.gz
Performance enhancements for the tests, producers stalled individually above maxPending size.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@581647 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java2
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java252
2 files changed, 142 insertions, 112 deletions
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
index edbc311bd5..06081e6ebf 100644
--- a/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
+++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
@@ -29,11 +29,9 @@ import org.apache.qpid.requestreply.PingPongProducer;
import uk.co.thebadgerset.junit.extensions.TimingController;
import uk.co.thebadgerset.junit.extensions.TimingControllerAware;
-import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
import javax.jms.JMSException;
import javax.jms.Message;
-import javax.jms.ObjectMessage;
import java.util.Collections;
import java.util.HashMap;
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
index bca67bb0ce..bf1d9aba4a 100644
--- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
@@ -423,14 +423,20 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
* Holds a monitor which is used to synchronize sender and receivers threads, where the sender has elected
* to wait until the number of unreceived message is reduced before continuing to send.
*/
- protected static final Object _sendPauseMonitor = new Object();
+ protected final Object _sendPauseMonitor = new Object();
/** Keeps a count of the number of message currently sent but not received. */
- protected static AtomicInteger _unreceived = new AtomicInteger(0);
+ protected AtomicInteger _unreceived = new AtomicInteger(0);
/** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */
private static AtomicLong _correlationIdGenerator = new AtomicLong(0L);
+ /** A source for providing sequential unqiue ids for instances of this class to be identifed with. */
+ private static AtomicInteger _instanceIdGenerator = new AtomicInteger(0);
+
+ /** Holds this instances unique id. */
+ private int instanceId;
+
/**
* Holds a map from message ids to latches on which threads wait for replies. This map is shared accross multiple
* ping producers on the same JVM.
@@ -507,6 +513,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
public PingPongProducer(Properties overrides) throws Exception
{
// log.debug("public PingPongProducer(Properties overrides = " + overrides + "): called");
+ instanceId = _instanceIdGenerator.getAndIncrement();
// Create a set of parsed properties from the defaults overriden by the passed in values.
ParsedProperties properties = new ParsedProperties(defaults);
@@ -814,9 +821,9 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
/*log.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations
+ ", String selector = " + selector + "): called");*/
- // log.debug("There are " + destinations.size() + " destinations.");
- // log.debug("Creating " + _noOfConsumers + " consumers on each destination.");
- // log.debug("Total number of consumers is: " + (destinations.size() * _noOfConsumers));
+ log.debug("There are " + destinations.size() + " destinations.");
+ log.debug("Creating " + _noOfConsumers + " consumers on each destination.");
+ log.debug("Total number of consumers is: " + (destinations.size() * _noOfConsumers));
for (Destination destination : destinations)
{
@@ -839,7 +846,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
}
});
- // log.debug("Set consumer " + i + " to listen to replies sent to destination: " + destination);
+ log.debug("Set consumer " + i + " to listen to replies sent to destination: " + destination);
}
}
}
@@ -861,7 +868,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
long timestamp = getTimestamp(message);
long pingTime = now - timestamp;
- NDC.push("cons" + consumerNo);
+ // NDC.push("id" + instanceId + "/cons" + consumerNo);
// Extract the messages correlation id.
String correlationID = message.getJMSCorrelationID();
@@ -887,38 +894,41 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
// log.debug("Reply was expected, decrementing the latch for the id, " + correlationID);
- // Decrement the countdown latch. Before this point, it is possible that two threads might enter this
- // method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block
- // ensures that each thread will get a unique value for the remaining messages.
- long trueCount;
- long remainingCount;
+ // log.debug("unreceived = " + unreceived);
+ // log.debug("unreceivedSize = " + unreceivedSize);
- synchronized (trafficLight)
+ // Release waiting senders if there are some and using maxPending limit.
+ if ((_maxPendingSize > 0))
{
- trafficLight.countDown();
-
- trueCount = trafficLight.getCount();
- remainingCount = trueCount - 1;
-
// Decrement the count of sent but not yet received messages.
int unreceived = _unreceived.decrementAndGet();
int unreceivedSize =
(unreceived * ((_messageSize == 0) ? 1 : _messageSize))
/ (_isPubSub ? getConsumersPerDestination() : 1);
- // log.debug("unreceived = " + unreceived);
- // log.debug("unreceivedSize = " + unreceivedSize);
-
- // Release a waiting sender if there is one.
synchronized (_sendPauseMonitor)
{
- if ((_maxPendingSize > 0) && (unreceivedSize < _maxPendingSize))
+ if (unreceivedSize < _maxPendingSize)
{
_sendPauseMonitor.notify();
}
}
+ }
+
+ // Decrement the countdown latch. Before this point, it is possible that two threads might enter this
+ // method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block
+ // ensures that each thread will get a unique value for the remaining messages.
+ long trueCount;
+ long remainingCount;
- NDC.push("/rem" + remainingCount);
+ synchronized (trafficLight)
+ {
+ trafficLight.countDown();
+
+ trueCount = trafficLight.getCount();
+ remainingCount = trueCount - 1;
+
+ // NDC.push("/rem" + remainingCount);
// log.debug("remainingCount = " + remainingCount);
// log.debug("trueCount = " + trueCount);
@@ -1069,7 +1079,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
// commitTx(_consumerSession);
- // //log.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending");
+ // log.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending");
return numReplies;
}
@@ -1146,109 +1156,131 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
*/
protected boolean sendMessage(int i, Message message) throws JMSException
{
- // log.debug("protected boolean sendMessage(int i = " + i + ", Message message): called");
- // log.debug("_txBatchSize = " + _txBatchSize);
-
- // Round robin the destinations as the messages are sent.
- Destination destination = _pingDestinations.get(i % _pingDestinations.size());
-
- // Prompt the user to kill the broker when doing failover testing.
- _failBeforeSend = waitForUserToPromptOnFailure(_failBeforeSend);
-
- // If necessary, wait until the max pending message size comes within its limit.
- synchronized (_sendPauseMonitor)
+ try
{
- // Used to keep track of the number of times that send has to wait.
- int numWaits = 0;
+ NDC.push("id" + instanceId + "/prod");
- // The maximum number of waits before the test gives up and fails. This has been chosen to correspond with
- // the test timeout.
- int waitLimit = (int) (TIMEOUT_DEFAULT / 10000);
+ // log.debug("protected boolean sendMessage(int i = " + i + ", Message message): called");
+ // log.debug("_txBatchSize = " + _txBatchSize);
- while ((_maxPendingSize > 0))
- {
- // Get the size estimate of sent but not yet received messages.
- int unreceived = _unreceived.get();
- int unreceivedSize =
- (unreceived * ((_messageSize == 0) ? 1 : _messageSize)) / (_isPubSub ? getConsumersPerDestination() : 1);
+ // Round robin the destinations as the messages are sent.
+ Destination destination = _pingDestinations.get(i % _pingDestinations.size());
- // log.debug("unreceived = " + unreceived);
- // log.debug("unreceivedSize = " + unreceivedSize);
- // log.debug("_maxPendingSize = " + _maxPendingSize);
+ // Prompt the user to kill the broker when doing failover testing.
+ _failBeforeSend = waitForUserToPromptOnFailure(_failBeforeSend);
- if (unreceivedSize > _maxPendingSize)
+ // If necessary, wait until the max pending message size comes within its limit.
+ if (_maxPendingSize > 0)
+ {
+ synchronized (_sendPauseMonitor)
{
- // log.debug("unreceived size estimate over limit = " + unreceivedSize);
+ // Used to keep track of the number of times that send has to wait.
+ int numWaits = 0;
- // Wait on the send pause barrier for the limit to be re-established.
- try
- {
- _sendPauseMonitor.wait(10000);
- numWaits++;
- }
- catch (InterruptedException e)
- {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- }
+ // The maximum number of waits before the test gives up and fails. This has been chosen to correspond with
+ // the test timeout.
+ int waitLimit = (int) (TIMEOUT_DEFAULT / 10000);
- // Fail the test if the send has had to wait more than the maximum allowed number of times.
- if (numWaits >= waitLimit)
+ while (true)
{
- String errorMessage =
- "Send has had to wait for the unreceivedSize (" + unreceivedSize
- + ") to come below the maxPendingSize (" + _maxPendingSize + ") more that " + waitLimit
- + " times.";
- log.warn(errorMessage);
- throw new RuntimeException(errorMessage);
+ // Get the size estimate of sent but not yet received messages.
+ int unreceived = _unreceived.get();
+ int unreceivedSize =
+ (unreceived * ((_messageSize == 0) ? 1 : _messageSize))
+ / (_isPubSub ? getConsumersPerDestination() : 1);
+
+ // log.debug("unreceived = " + unreceived);
+ // log.debug("unreceivedSize = " + unreceivedSize);
+ // log.debug("_maxPendingSize = " + _maxPendingSize);
+
+ if (unreceivedSize > _maxPendingSize)
+ {
+ // log.debug("unreceived size estimate over limit = " + unreceivedSize);
+
+ // Fail the test if the send has had to wait more than the maximum allowed number of times.
+ if (numWaits > waitLimit)
+ {
+ String errorMessage =
+ "Send has had to wait for the unreceivedSize (" + unreceivedSize
+ + ") to come below the maxPendingSize (" + _maxPendingSize + ") more that " + waitLimit
+ + " times.";
+ log.warn(errorMessage);
+ throw new RuntimeException(errorMessage);
+ }
+
+ // Wait on the send pause barrier for the limit to be re-established.
+ try
+ {
+ long start = System.nanoTime();
+ _sendPauseMonitor.wait(10000);
+ long end = System.nanoTime();
+
+ // Count the wait only if it was for > 99% of the requested wait time.
+ if (((float) (end - start) / (float) (10000 * 1000000L)) > 0.99)
+ {
+ numWaits++;
+ }
+ }
+ catch (InterruptedException e)
+ {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+ else
+ {
+ break;
+ }
}
}
- else
- {
- break;
- }
}
- }
- // Send the message either to its round robin destination, or its default destination.
- int num = numSent.incrementAndGet();
- message.setIntProperty("MSG_NUM", num);
- setTimestamp(message);
+ // Send the message either to its round robin destination, or its default destination.
+ // int num = numSent.incrementAndGet();
+ // message.setIntProperty("MSG_NUM", num);
+ setTimestamp(message);
- if (destination == null)
- {
- _producer.send(message);
- }
- else
- {
- _producer.send(destination, message);
- }
+ if (destination == null)
+ {
+ _producer.send(message);
+ }
+ else
+ {
+ _producer.send(destination, message);
+ }
- // Increase the unreceived size, this may actually happen after the message is received.
- // The unreceived size is incremented by the number of consumers that will get a copy of the message,
- // in pub/sub mode.
- // _unreceived.getAndIncrement();
- /*int newUnreceivedCount =*/ _unreceived.addAndGet(_isPubSub ? getConsumersPerDestination() : 1);
- // log.debug("newUnreceivedCount = " + newUnreceivedCount);
+ // Increase the unreceived size, this may actually happen after the message is received.
+ // The unreceived size is incremented by the number of consumers that will get a copy of the message,
+ // in pub/sub mode.
+ if (_maxPendingSize > 0)
+ {
+ int newUnreceivedCount = _unreceived.addAndGet(_isPubSub ? getConsumersPerDestination() : 1);
+ // log.debug("newUnreceivedCount = " + newUnreceivedCount);
+ }
- // Apply message rate throttling if a rate limit has been set up.
- if (_rateLimiter != null)
- {
- _rateLimiter.throttle();
- }
+ // Apply message rate throttling if a rate limit has been set up.
+ if (_rateLimiter != null)
+ {
+ _rateLimiter.throttle();
+ }
- // Call commit every time the commit batch size is reached.
- boolean committed = false;
+ // Call commit every time the commit batch size is reached.
+ boolean committed = false;
+
+ // Commit on every transaction batch size boundary. Here i + 1 is the count of actual messages sent.
+ if (((i + 1) % _txBatchSize) == 0)
+ {
+ // log.debug("Trying commit on producer session.");
+ committed = commitTx(_producerSession);
+ }
- // Commit on every transaction batch size boundary. Here i + 1 is the count of actual messages sent.
- if (((i + 1) % _txBatchSize) == 0)
+ return committed;
+ }
+ finally
{
- // log.debug("Trying commit on producer session.");
- committed = commitTx(_producerSession);
+ NDC.clear();
}
-
- return committed;
}
/**
@@ -1269,7 +1301,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
failFlag = false;
}
- // log.trace("Failing Before Send");
+ // log.debug("Failing Before Send");
waitForUser(KILL_BROKER_PROMPT);
}
@@ -1524,7 +1556,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
{
_failBeforeCommit = waitForUserToPromptOnFailure(_failBeforeCommit);
- // long start = System.nanoTime();
+ long start = System.nanoTime();
session.commit();
committed = true;
// log.debug("Time taken to commit :" + ((System.nanoTime() - start) / 1000000f) + " ms");