diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java | 5 |
1 files changed, 4 insertions, 1 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 8a15fffe84..d86a2739f2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -32,6 +32,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Destination; @@ -96,6 +97,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe /** Flow control */ private FlowControlIndicator _flowControl = new FlowControlIndicator(); + private final AtomicBoolean _creditChanged = new AtomicBoolean(); /** * Creates a new session on a connection. @@ -847,6 +849,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class); + _creditChanged.set(true); return true; } else @@ -863,7 +866,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe int acknowledgeMode = getAcknowledgeMode(); boolean manageCredit = acknowledgeMode == javax.jms.Session.CLIENT_ACKNOWLEDGE || acknowledgeMode == javax.jms.Session.SESSION_TRANSACTED; - if(manageCredit) + if(manageCredit && _creditChanged.compareAndSet(true,false)) { new FailoverNoopSupport<>( new FailoverProtectedOperation<Void, AMQException>() |