summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRupert Smith <rupertlssmith@apache.org>2007-10-12 10:55:02 +0000
committerRupert Smith <rupertlssmith@apache.org>2007-10-12 10:55:02 +0000
commita88cefc2e0df7b7a1d9119296d168508d602ee61 (patch)
treec8e7abbd6d12baa99fcbb371c1881930f35c64d4
parentd83cea6de3801f00d80836bf8198364db3351b00 (diff)
downloadqpid-python-a88cefc2e0df7b7a1d9119296d168508d602ee61.tar.gz
Merged revisions 584124 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1 ........ r584124 | rupertlssmith | 2007-10-12 11:52:52 +0100 (Fri, 12 Oct 2007) | 1 line Implemented fair scheduling of producers in tests to prevent starvation and test timeout. ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@584125 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java43
1 files changed, 23 insertions, 20 deletions
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
index 5e1f35053a..99ed9f8367 100644
--- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
@@ -46,6 +46,7 @@ import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -493,6 +494,17 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
private static AtomicInteger numSent = new AtomicInteger();
/**
+ * Holds a monitor which is used to synchronize sender and receivers threads, where the sender has elected
+ * to wait until the number of unreceived message is reduced before continuing to send. This monitor is a
+ * fair SynchronousQueue becuase that provides fair scheduling, to ensure that all producer threads get an
+ * equal chance to produce messages.
+ */
+ static final SynchronousQueue _sendPauseMonitor = new SynchronousQueue(true);
+
+ /** Keeps a count of the number of message currently sent but not received. */
+ static AtomicInteger _unreceived = new AtomicInteger(0);
+
+ /**
* Creates a ping producer with the specified parameters, of which there are many. See the class level comments
* for details. This constructor creates a connection to the broker and creates producer and consumer sessions on
* it, to send and recieve its pings and replies on.
@@ -889,7 +901,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
if ((_maxPendingSize > 0))
{
// Decrement the count of sent but not yet received messages.
- int unreceived = perCorrelationId._unreceived.decrementAndGet();
+ int unreceived = _unreceived.decrementAndGet();
int unreceivedSize =
(unreceived * ((_messageSize == 0) ? 1 : _messageSize))
/ (_isPubSub ? getConsumersPerDestination() : 1);
@@ -897,13 +909,13 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
// log.debug("unreceived = " + unreceived);
// log.debug("unreceivedSize = " + unreceivedSize);
- synchronized (perCorrelationId._sendPauseMonitor)
+ // synchronized (_sendPauseMonitor)
+ // {
+ if (unreceivedSize < _maxPendingSize)
{
- if (unreceivedSize < _maxPendingSize)
- {
- perCorrelationId._sendPauseMonitor.notify();
- }
+ _sendPauseMonitor.poll();
}
+ // }
}
// Decrement the countdown latch. Before this point, it is possible that two threads might enter this
@@ -1167,7 +1179,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
// If necessary, wait until the max pending message size comes within its limit.
if (_maxPendingSize > 0)
{
- synchronized (perCorrelationId._sendPauseMonitor)
+ synchronized (_sendPauseMonitor)
{
// Used to keep track of the number of times that send has to wait.
int numWaits = 0;
@@ -1179,7 +1191,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
while (true)
{
// Get the size estimate of sent but not yet received messages.
- int unreceived = perCorrelationId._unreceived.get();
+ int unreceived = _unreceived.get();
int unreceivedSize =
(unreceived * ((_messageSize == 0) ? 1 : _messageSize))
/ (_isPubSub ? getConsumersPerDestination() : 1);
@@ -1207,7 +1219,8 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
try
{
long start = System.nanoTime();
- perCorrelationId._sendPauseMonitor.wait(10000);
+ // _sendPauseMonitor.wait(10000);
+ _sendPauseMonitor.offer(new Object(), 10000, TimeUnit.MILLISECONDS);
long end = System.nanoTime();
// Count the wait only if it was for > 99% of the requested wait time.
@@ -1250,8 +1263,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
// in pub/sub mode.
if (_maxPendingSize > 0)
{
- int newUnreceivedCount =
- perCorrelationId._unreceived.addAndGet(_isPubSub ? getConsumersPerDestination() : 1);
+ int newUnreceivedCount = _unreceived.addAndGet(_isPubSub ? getConsumersPerDestination() : 1);
// log.debug("newUnreceivedCount = " + newUnreceivedCount);
}
@@ -1672,14 +1684,5 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
/** Holds the last timestamp that the timeout was reset to. */
Long timeOutStart;
-
- /**
- * Holds a monitor which is used to synchronize sender and receivers threads, where the sender has elected
- * to wait until the number of unreceived message is reduced before continuing to send.
- */
- final Object _sendPauseMonitor = new Object();
-
- /** Keeps a count of the number of message currently sent but not received. */
- AtomicInteger _unreceived = new AtomicInteger(0);
}
}