diff options
author | Martin Ritchie <ritchiem@apache.org> | 2006-10-03 15:11:24 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2006-10-03 15:11:24 +0000 |
commit | 587542968e8f262b077d22bd4556044a5d65bef9 (patch) | |
tree | cafff79bf70073c2c4c1cd978cab61a810be6a56 /java/client/src | |
parent | a6e80e575a8ca66f0f4cc9bf6ae1ec4b37c073e9 (diff) | |
download | qpid-python-587542968e8f262b077d22bd4556044a5d65bef9.tar.gz |
client/* - Only Create a Threshold Listener if if the acknowledgeMode is NO_ACK
common/*/framing/* - White space changes from tabs to 4 spaces.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@452529 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r-- | java/client/src/org/apache/qpid/client/AMQSession.java | 7 | ||||
-rw-r--r-- | java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java | 19 |
2 files changed, 20 insertions, 6 deletions
diff --git a/java/client/src/org/apache/qpid/client/AMQSession.java b/java/client/src/org/apache/qpid/client/AMQSession.java index 4768399036..3bc670e609 100644 --- a/java/client/src/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/org/apache/qpid/client/AMQSession.java @@ -220,6 +220,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _channelId = channelId; _messageFactoryRegistry = messageFactoryRegistry; _defaultPrefetch = defaultPrefetch; + if (_acknowledgeMode == NO_ACKNOWLEDGE) + { _queue = new FlowControllingBlockingQueue(_defaultPrefetch, new FlowControllingBlockingQueue.ThresholdListener() { @@ -241,6 +243,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } }); + } + else + { + _queue = new FlowControllingBlockingQueue(_defaultPrefetch,null); + } } AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode) diff --git a/java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java index ad2ca7b731..89e6968e44 100644 --- a/java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java +++ b/java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java @@ -63,11 +63,14 @@ public class FlowControllingBlockingQueue public Object take() throws InterruptedException { Object o = _queue.take(); - synchronized (_listener) + if (_listener != null) { - if (--_count == (_flowControlThreshold - 1)) + synchronized(_listener) { - _listener.underThreshold(_count); + if (--_count == (_flowControlThreshold - 1)) + { + _listener.underThreshold(_count); + } } } return o; @@ -76,12 +79,16 @@ public class FlowControllingBlockingQueue public void add(Object o) { _queue.add(o); - synchronized (_listener) + if (_listener != null) { - if (++_count == _flowControlThreshold) + synchronized(_listener) { - _listener.aboveThreshold(_count); + if (++_count == _flowControlThreshold) + { + _listener.aboveThreshold(_count); + } } } } } + |