summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2008-07-01 11:19:20 +0000
committerRobert Godfrey <rgodfrey@apache.org>2008-07-01 11:19:20 +0000
commit8c87dc998802e4c78dd57b9eb2a6ae60a53c5cad (patch)
tree6899c1f4ecd6663d9bc33511f849df3217ef2fcc
parent873a6c01a4911e530cfbfbcdf8d543569bc37b7e (diff)
downloadqpid-python-8c87dc998802e4c78dd57b9eb2a6ae60a53c5cad.tar.gz
QPID-1084 : Applying patch previously applied to M2.x
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@673058 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java25
1 files changed, 16 insertions, 9 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 6d386f31e6..6755e4da5f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -445,21 +445,25 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
new FlowControllingBlockingQueue.ThresholdListener()
{
+ private final AtomicBoolean _suspendState = new AtomicBoolean();
+
public void aboveThreshold(int currentValue)
{
- _logger.debug(
- "Above threshold(" + _defaultPrefetchHighMark
- + ") so suspending channel. Current value is " + currentValue);
- new Thread(new SuspenderRunner(true)).start();
+ _logger.debug(
+ "Above threshold(" + _defaultPrefetchHighMark
+ + ") so suspending channel. Current value is " + currentValue);
+ _suspendState.set(true);
+ new Thread(new SuspenderRunner(_suspendState)).start();
}
public void underThreshold(int currentValue)
{
- _logger.debug(
+ _logger.debug(
"Below threshold(" + _defaultPrefetchLowMark
+ ") so unsuspending channel. Current value is " + currentValue);
- new Thread(new SuspenderRunner(false)).start();
+ _suspendState.set(false);
+ new Thread(new SuspenderRunner(_suspendState)).start();
}
});
@@ -2915,9 +2919,9 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
private class SuspenderRunner implements Runnable
{
- private boolean _suspend;
+ private AtomicBoolean _suspend;
- public SuspenderRunner(boolean suspend)
+ public SuspenderRunner(AtomicBoolean suspend)
{
_suspend = suspend;
}
@@ -2926,7 +2930,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
try
{
- suspendChannel(_suspend);
+ synchronized(_suspensionLock)
+ {
+ suspendChannel(_suspend.get());
+ }
}
catch (AMQException e)
{