From a88cefc2e0df7b7a1d9119296d168508d602ee61 Mon Sep 17 00:00:00 2001 From: Rupert Smith Date: Fri, 12 Oct 2007 10:55:02 +0000 Subject: Merged revisions 584124 via svnmerge from https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1 ........ r584124 | rupertlssmith | 2007-10-12 11:52:52 +0100 (Fri, 12 Oct 2007) | 1 line Implemented fair scheduling of producers in tests to prevent starvation and test timeout. ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@584125 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/requestreply/PingPongProducer.java | 43 ++++++++++++---------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index 5e1f35053a..99ed9f8367 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -46,6 +46,7 @@ import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -492,6 +493,17 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti /** Keeps count of the total messages sent purely for debugging purposes. */ private static AtomicInteger numSent = new AtomicInteger(); + /** + * Holds a monitor which is used to synchronize sender and receivers threads, where the sender has elected + * to wait until the number of unreceived message is reduced before continuing to send. This monitor is a + * fair SynchronousQueue becuase that provides fair scheduling, to ensure that all producer threads get an + * equal chance to produce messages. + */ + static final SynchronousQueue _sendPauseMonitor = new SynchronousQueue(true); + + /** Keeps a count of the number of message currently sent but not received. */ + static AtomicInteger _unreceived = new AtomicInteger(0); + /** * Creates a ping producer with the specified parameters, of which there are many. See the class level comments * for details. This constructor creates a connection to the broker and creates producer and consumer sessions on @@ -889,7 +901,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti if ((_maxPendingSize > 0)) { // Decrement the count of sent but not yet received messages. - int unreceived = perCorrelationId._unreceived.decrementAndGet(); + int unreceived = _unreceived.decrementAndGet(); int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : _messageSize)) / (_isPubSub ? getConsumersPerDestination() : 1); @@ -897,13 +909,13 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti // log.debug("unreceived = " + unreceived); // log.debug("unreceivedSize = " + unreceivedSize); - synchronized (perCorrelationId._sendPauseMonitor) + // synchronized (_sendPauseMonitor) + // { + if (unreceivedSize < _maxPendingSize) { - if (unreceivedSize < _maxPendingSize) - { - perCorrelationId._sendPauseMonitor.notify(); - } + _sendPauseMonitor.poll(); } + // } } // Decrement the countdown latch. Before this point, it is possible that two threads might enter this @@ -1167,7 +1179,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti // If necessary, wait until the max pending message size comes within its limit. if (_maxPendingSize > 0) { - synchronized (perCorrelationId._sendPauseMonitor) + synchronized (_sendPauseMonitor) { // Used to keep track of the number of times that send has to wait. int numWaits = 0; @@ -1179,7 +1191,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti while (true) { // Get the size estimate of sent but not yet received messages. - int unreceived = perCorrelationId._unreceived.get(); + int unreceived = _unreceived.get(); int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : _messageSize)) / (_isPubSub ? getConsumersPerDestination() : 1); @@ -1207,7 +1219,8 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti try { long start = System.nanoTime(); - perCorrelationId._sendPauseMonitor.wait(10000); + // _sendPauseMonitor.wait(10000); + _sendPauseMonitor.offer(new Object(), 10000, TimeUnit.MILLISECONDS); long end = System.nanoTime(); // Count the wait only if it was for > 99% of the requested wait time. @@ -1250,8 +1263,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti // in pub/sub mode. if (_maxPendingSize > 0) { - int newUnreceivedCount = - perCorrelationId._unreceived.addAndGet(_isPubSub ? getConsumersPerDestination() : 1); + int newUnreceivedCount = _unreceived.addAndGet(_isPubSub ? getConsumersPerDestination() : 1); // log.debug("newUnreceivedCount = " + newUnreceivedCount); } @@ -1672,14 +1684,5 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti /** Holds the last timestamp that the timeout was reset to. */ Long timeOutStart; - - /** - * Holds a monitor which is used to synchronize sender and receivers threads, where the sender has elected - * to wait until the number of unreceived message is reduced before continuing to send. - */ - final Object _sendPauseMonitor = new Object(); - - /** Keeps a count of the number of message currently sent but not received. */ - AtomicInteger _unreceived = new AtomicInteger(0); } } -- cgit v1.2.1