diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java | 23 |
1 files changed, 20 insertions, 3 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java index 0fc39a9318..bddbc329ab 100644 --- a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java +++ b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java @@ -21,8 +21,10 @@ package org.apache.qpid.client.util; import java.util.Iterator; +import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; /** * A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow @@ -35,7 +37,7 @@ import java.util.concurrent.LinkedBlockingQueue; public class FlowControllingBlockingQueue { /** This queue is bounded and is used to store messages before being dispatched to the consumer */ - private final BlockingQueue _queue = new LinkedBlockingQueue(); + private final Queue _queue = new ConcurrentLinkedQueue(); private final int _flowControlHighThreshold; private final int _flowControlLowThreshold; @@ -71,7 +73,17 @@ public class FlowControllingBlockingQueue public Object take() throws InterruptedException { - Object o = _queue.take(); + Object o = _queue.poll(); + if(o == null) + { + synchronized(this) + { + while((o = _queue.poll())==null) + { + wait(); + } + } + } if (_listener != null) { synchronized (_listener) @@ -88,7 +100,12 @@ public class FlowControllingBlockingQueue public void add(Object o) { - _queue.add(o); + synchronized(this) + { + _queue.add(o); + + notifyAll(); + } if (_listener != null) { synchronized (_listener) |