summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java94
1 files changed, 58 insertions, 36 deletions
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
index 1b4fa6b779..d5d1c304e9 100644
--- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
@@ -98,7 +98,7 @@ import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
* 3 - DUPS_OK_ACKNOWLEDGE
* 257 - NO_ACKNOWLEDGE
* 258 - PRE_ACKNOWLEDGE
- * <tr><td> maxPending <td> 0 <td> The maximum size in bytes, of messages send but not yet received.
+ * <tr><td> maxPending <td> 0 <td> The maximum size in bytes, of messages sent but not yet received.
* Limits the volume of messages currently buffered on the client
* or broker. Can help scale test clients by limiting amount of buffered
* data to avoid out of memory errors.
@@ -373,10 +373,10 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
protected int _maxPendingSize;
/**
- * Holds a cyclic barrier which is used to synchronize sender and receiver threads, where the sender has elected
+ * Holds a monitor which is used to synchronize sender and receiver threads, where the sender has elected
* to wait until the number of unreceived message is reduced before continuing to send.
*/
- protected CyclicBarrier _sendPauseBarrier = new CyclicBarrier(2);
+ protected Object _sendPauseMonitor = new Object();
/** Keeps a count of the number of message currently sent but not received. */
protected AtomicInteger _unreceived = new AtomicInteger(0);
@@ -801,23 +801,27 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : _messageSize));
// Release a waiting sender if there is one.
- if ((_maxPendingSize > 0) && (unreceivedSize < _maxPendingSize)
- && (_sendPauseBarrier.getNumberWaiting() == 1))
+ synchronized (_sendPauseMonitor)
{
- log.debug("unreceived size estimate under limit = " + unreceivedSize);
-
- // Wait on the send pause barrier for the limit to be re-established.
- try
- {
- _sendPauseBarrier.await();
- }
- catch (InterruptedException e)
+ if ((_maxPendingSize > 0) && (unreceivedSize < _maxPendingSize))
+ // && (_sendPauseBarrier.getNumberWaiting() == 1))
{
- throw new RuntimeException(e);
- }
- catch (BrokenBarrierException e)
- {
- throw new RuntimeException(e);
+ log.debug("unreceived size estimate under limit = " + unreceivedSize);
+
+ // Wait on the send pause barrier for the limit to be re-established.
+ /*try
+ {*/
+ // _sendPauseBarrier.await();
+ _sendPauseMonitor.notify();
+ /*}
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (BrokenBarrierException e)
+ {
+ throw new RuntimeException(e);
+ }*/
}
}
@@ -1052,26 +1056,40 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
waitForUser(KILL_BROKER_PROMPT);
}
- // Increase the count of sent but not yet received messages.
- int unreceived = _unreceived.getAndIncrement();
- int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : _messageSize));
-
- if ((_maxPendingSize > 0) && (unreceivedSize > _maxPendingSize))
+ // If necessary, wait until the max pending message size comes within its limit.
+ synchronized (_sendPauseMonitor)
{
- log.debug("unreceived size estimate over limit = " + unreceivedSize);
-
- // Wait on the send pause barrier for the limit to be re-established.
- try
- {
- _sendPauseBarrier.await();
- }
- catch (InterruptedException e)
+ while ((_maxPendingSize > 0))
{
- throw new RuntimeException(e);
- }
- catch (BrokenBarrierException e)
- {
- throw new RuntimeException(e);
+ // Get the size estimate of sent but not yet received messages.
+ int unreceived = _unreceived.get();
+ int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : _messageSize));
+
+ if (unreceivedSize > _maxPendingSize)
+ {
+ log.debug("unreceived size estimate over limit = " + unreceivedSize);
+
+ // Wait on the send pause barrier for the limit to be re-established.
+ try
+ {
+ // _sendPauseBarrier.await();
+ _sendPauseMonitor.wait(1000);
+ }
+ catch (InterruptedException e)
+ {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ /*catch (BrokenBarrierException e)
+ {
+ throw new RuntimeException(e);
+ }*/
+ }
+ else
+ {
+ break;
+ }
}
}
@@ -1085,6 +1103,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
_producer.send(destination, message);
}
+ // Increase the unreceived size, this may actually happen aftern the message is recevied.
+ _unreceived.getAndIncrement();
+
// Apply message rate throttling if a rate limit has been set up.
if (_rateLimiter != null)
{
@@ -1300,6 +1321,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
* @return <tt>true</tt> if the session was committed, <tt>false</tt> if it was not.
*
* @throws javax.jms.JMSException If the commit fails and then the rollback fails.
+ *
* @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit
* method, because commits only apply to transactional pingers, but fail after send applied to transactional and
* non-transactional alike.