summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2008-05-21 10:08:41 +0000
committerRobert Godfrey <rgodfrey@apache.org>2008-05-21 10:08:41 +0000
commitee5af590cb289157d8c4100c7995e5d7a70a90b4 (patch)
treebfbb31ff58741614c43d12c01ead2847cbd79304
parentcc77970c60fc8b632b8074ccaf0411a537db1150 (diff)
downloadqpid-python-ee5af590cb289157d8c4100c7995e5d7a70a90b4.tar.gz
QPID-1084 : Fix AMQSession race condition on no-ack flow control
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.x@658614 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java23
1 files changed, 15 insertions, 8 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index eb776ba786..6c70f1f874 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -468,12 +468,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
new FlowControllingBlockingQueue.ThresholdListener()
{
+ private final AtomicBoolean _suspendState = new AtomicBoolean();
+
public void aboveThreshold(int currentValue)
{
- _logger.debug(
- "Above threshold(" + _defaultPrefetchHighMark
- + ") so suspending channel. Current value is " + currentValue);
- new Thread(new SuspenderRunner(true)).start();
+ _logger.debug(
+ "Above threshold(" + _defaultPrefetchHighMark
+ + ") so suspending channel. Current value is " + currentValue);
+ _suspendState.set(true);
+ new Thread(new SuspenderRunner(_suspendState)).start();
}
@@ -482,7 +485,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_logger.debug(
"Below threshold(" + _defaultPrefetchLowMark
+ ") so unsuspending channel. Current value is " + currentValue);
- new Thread(new SuspenderRunner(false)).start();
+ _suspendState.set(false);
+ new Thread(new SuspenderRunner(_suspendState)).start();
}
});
@@ -3106,9 +3110,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private class SuspenderRunner implements Runnable
{
- private boolean _suspend;
+ private AtomicBoolean _suspend;
- public SuspenderRunner(boolean suspend)
+ public SuspenderRunner(AtomicBoolean suspend)
{
_suspend = suspend;
}
@@ -3117,7 +3121,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
try
{
- suspendChannel(_suspend);
+ synchronized(_suspensionLock)
+ {
+ suspendChannel(_suspend.get());
+ }
}
catch (AMQException e)
{