diff options
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java')
-rw-r--r-- | qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java | 40 |
1 files changed, 12 insertions, 28 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java index 8dddac9809..dd43ae7e11 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java @@ -21,48 +21,27 @@ package org.apache.qpid.server.protocol.v0_10; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.flow.AbstractFlowCreditManager; public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10 { + private final ServerProtocolEngine _serverProtocolEngine; private volatile long _bytesCredit; private volatile long _messageCredit; - public CreditCreditManager(long bytesCredit, long messageCredit) + public CreditCreditManager(long bytesCredit, long messageCredit, final ServerProtocolEngine serverProtocolEngine) { + _serverProtocolEngine = serverProtocolEngine; _bytesCredit = bytesCredit; _messageCredit = messageCredit; setSuspended(!hasCredit()); } - - public synchronized void setCreditLimits(final long bytesCredit, final long messageCredit) - { - _bytesCredit = bytesCredit; - _messageCredit = messageCredit; - - setSuspended(!hasCredit()); - - } - - - public long getMessageCredit() - { - return _messageCredit == -1L - ? Long.MAX_VALUE - : _messageCredit; - } - - public long getBytesCredit() - { - return _bytesCredit == -1L - ? Long.MAX_VALUE - : _bytesCredit; - } - public synchronized void restoreCredit(final long messageCredit, final long bytesCredit) { + setSuspended(!hasCredit()); } @@ -107,12 +86,17 @@ public class CreditCreditManager extends AbstractFlowCreditManager implements Fl public synchronized boolean hasCredit() { // Note !=, if credit is < 0 that indicates infinite credit - return (_bytesCredit != 0L && _messageCredit != 0L); + return (_bytesCredit != 0L && _messageCredit != 0L && !_serverProtocolEngine.isTransportBlockedForWriting()); } public synchronized boolean useCreditForMessage(long msgSize) { - if(_messageCredit >= 0L) + if (_serverProtocolEngine.isTransportBlockedForWriting()) + { + setSuspended(true); + return false; + } + else if(_messageCredit >= 0L) { if(_messageCredit > 0) { |