From e0906fef00a14798d0ba72b44f8b03e8b4a60cb4 Mon Sep 17 00:00:00 2001 From: Robert Gemmell Date: Fri, 11 Nov 2011 17:41:54 +0000 Subject: QPID-3592: ensure that the 'used credit' values are decremented when message transfer commands 'completed' following a message.stop command attempt to restore their credit (to no effect due to the 0 credit limit) when using 0-10 Window credit mode. Add unit test, and break CreditManager dependency on ServerMessages by passing the required size value rather than the message itself. Merged from trunk r1200801 (minus changes to python test excludes, which never existed on the 0.14 branch). git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.14@1200985 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/flow/BytesOnlyCreditManager.java | 4 +- .../qpid/server/flow/CreditCreditManager.java | 11 +++--- .../apache/qpid/server/flow/FlowCreditManager.java | 3 +- .../qpid/server/flow/LimitlessCreditManager.java | 3 +- .../server/flow/MessageAndBytesCreditManager.java | 4 +- .../qpid/server/flow/MessageOnlyCreditManager.java | 3 +- .../qpid/server/flow/Pre0_10CreditManager.java | 11 +++--- .../qpid/server/flow/WindowCreditManager.java | 46 +++++++++++----------- .../qpid/server/handler/BasicGetMethodHandler.java | 2 +- .../qpid/server/subscription/SubscriptionImpl.java | 4 +- .../server/subscription/Subscription_0_10.java | 4 +- 11 files changed, 42 insertions(+), 53 deletions(-) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java index c5f2d1e808..5a5ef5e6c5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java @@ -1,6 +1,5 @@ package org.apache.qpid.server.flow; -import org.apache.qpid.server.message.ServerMessage; import java.util.concurrent.atomic.AtomicLong; @@ -59,9 +58,8 @@ public class BytesOnlyCreditManager extends AbstractFlowCreditManager return _bytesCredit.get() > 0L; } - public boolean useCreditForMessage(ServerMessage msg) + public boolean useCreditForMessage(long msgSize) { - final long msgSize = msg.getSize(); if(hasCredit()) { if(_bytesCredit.addAndGet(-msgSize) >= 0) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java index b47f986155..c6771177ac 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.flow; -import org.apache.qpid.server.message.ServerMessage; public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10 { @@ -118,7 +117,7 @@ public class CreditCreditManager extends AbstractFlowCreditManager implements Fl return (_bytesCredit != 0L && _messageCredit != 0L); } - public synchronized boolean useCreditForMessage(final ServerMessage msg) + public synchronized boolean useCreditForMessage(long msgSize) { if(_messageCredit >= 0L) { @@ -130,10 +129,10 @@ public class CreditCreditManager extends AbstractFlowCreditManager implements Fl return true; } - else if(msg.getSize() <= _bytesCredit) + else if(msgSize <= _bytesCredit) { _messageCredit--; - _bytesCredit -= msg.getSize(); + _bytesCredit -= msgSize; return true; } @@ -151,9 +150,9 @@ public class CreditCreditManager extends AbstractFlowCreditManager implements Fl } else if(_bytesCredit >= 0L) { - if(msg.getSize() <= _bytesCredit) + if(msgSize <= _bytesCredit) { - _bytesCredit -= msg.getSize(); + _bytesCredit -= msgSize; return true; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java index bec51d361d..8a80262983 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java @@ -1,6 +1,5 @@ package org.apache.qpid.server.flow; -import org.apache.qpid.server.message.ServerMessage; /* * @@ -41,6 +40,6 @@ public interface FlowCreditManager public boolean hasCredit(); - public boolean useCreditForMessage(ServerMessage msg); + public boolean useCreditForMessage(long msgSize); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java index 901b71fd1f..6fcc687440 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java @@ -1,6 +1,5 @@ package org.apache.qpid.server.flow; -import org.apache.qpid.server.message.ServerMessage; /* * @@ -47,7 +46,7 @@ public class LimitlessCreditManager extends AbstractFlowCreditManager implements return true; } - public boolean useCreditForMessage(ServerMessage msg) + public boolean useCreditForMessage(long msgSize) { return true; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java index 19a9ac1d23..0e6ce70a60 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java @@ -1,6 +1,5 @@ package org.apache.qpid.server.flow; -import org.apache.qpid.server.message.ServerMessage; /* * @@ -62,7 +61,7 @@ public class MessageAndBytesCreditManager extends AbstractFlowCreditManager impl return (_messageCredit > 0L) && ( _bytesCredit > 0L ); } - public synchronized boolean useCreditForMessage(ServerMessage msg) + public synchronized boolean useCreditForMessage(final long msgSize) { if(_messageCredit == 0L) { @@ -71,7 +70,6 @@ public class MessageAndBytesCreditManager extends AbstractFlowCreditManager impl } else { - final long msgSize = msg.getSize(); if(msgSize > _bytesCredit) { setSuspended(true); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java index a386f66b11..9b7c40e923 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java @@ -1,6 +1,5 @@ package org.apache.qpid.server.flow; -import org.apache.qpid.server.message.ServerMessage; import java.util.concurrent.atomic.AtomicLong; @@ -61,7 +60,7 @@ public class MessageOnlyCreditManager extends AbstractFlowCreditManager implemen return _messageCredit.get() > 0L; } - public boolean useCreditForMessage(ServerMessage msg) + public boolean useCreditForMessage(long msgSize) { if(hasCredit()) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java index 026804439c..a193f8fae4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.flow; -import org.apache.qpid.server.message.ServerMessage; public class Pre0_10CreditManager extends AbstractFlowCreditManager implements FlowCreditManager { @@ -133,7 +132,7 @@ public class Pre0_10CreditManager extends AbstractFlowCreditManager implements F && (_messageCreditLimit == 0L || _messageCredit > 0); } - public synchronized boolean useCreditForMessage(final ServerMessage msg) + public synchronized boolean useCreditForMessage(final long msgSize) { if(_messageCreditLimit != 0L) { @@ -147,10 +146,10 @@ public class Pre0_10CreditManager extends AbstractFlowCreditManager implements F } else { - if((_bytesCredit >= msg.getSize()) || (_bytesCredit == _bytesCreditLimit)) + if((_bytesCredit >= msgSize) || (_bytesCredit == _bytesCreditLimit)) { _messageCredit--; - _bytesCredit -= msg.getSize(); + _bytesCredit -= msgSize; return true; } @@ -176,9 +175,9 @@ public class Pre0_10CreditManager extends AbstractFlowCreditManager implements F } else { - if((_bytesCredit >= msg.getSize()) || (_bytesCredit == _bytesCreditLimit)) + if((_bytesCredit >= msgSize) || (_bytesCredit == _bytesCreditLimit)) { - _bytesCredit -= msg.getSize(); + _bytesCredit -= msgSize; return true; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java index 10f578551a..9623be595c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java @@ -20,10 +20,12 @@ */ package org.apache.qpid.server.flow; -import org.apache.qpid.server.message.ServerMessage; +import org.apache.log4j.Logger; public class WindowCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10 { + private static final Logger LOGGER = Logger.getLogger(WindowCreditManager.class); + private volatile long _bytesCreditLimit; private volatile long _messageCreditLimit; @@ -70,31 +72,30 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl public synchronized void restoreCredit(final long messageCredit, final long bytesCredit) { + _messageUsed -= messageCredit; + if(_messageUsed < 0L) + { + LOGGER.error("Message credit used value was negative: "+ _messageUsed); + _messageUsed = 0; + } + boolean notifyIncrease = true; + if(_messageCreditLimit > 0L) { notifyIncrease = (_messageUsed != _messageCreditLimit); - _messageUsed -= messageCredit; - - //TODO log warning - if(_messageUsed < 0L) - { - _messageUsed = 0; - } } - + _bytesUsed -= bytesCredit; + if(_bytesUsed < 0L) + { + LOGGER.error("Bytes credit used value was negative: "+ _messageUsed); + _bytesUsed = 0; + } if(_bytesCreditLimit > 0L) { notifyIncrease = notifyIncrease && bytesCredit>0; - _bytesUsed -= bytesCredit; - - //TODO log warning - if(_bytesUsed < 0L) - { - _bytesUsed = 0; - } if(notifyIncrease) { @@ -102,10 +103,7 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl } } - - setSuspended(!hasCredit()); - } @@ -116,7 +114,7 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl && (_messageCreditLimit < 0L || _messageCreditLimit > _messageUsed); } - public synchronized boolean useCreditForMessage(final ServerMessage msg) + public synchronized boolean useCreditForMessage(final long msgSize) { if(_messageCreditLimit >= 0L) { @@ -128,10 +126,10 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl return true; } - else if(_bytesUsed + msg.getSize() <= _bytesCreditLimit) + else if(_bytesUsed + msgSize <= _bytesCreditLimit) { _messageUsed++; - _bytesUsed += msg.getSize(); + _bytesUsed += msgSize; return true; } @@ -149,9 +147,9 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl } else if(_bytesCreditLimit >= 0L) { - if(_bytesUsed + msg.getSize() <= _bytesCreditLimit) + if(_bytesUsed + msgSize <= _bytesCreditLimit) { - _bytesUsed += msg.getSize(); + _bytesUsed += msgSize; return true; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java index 14ce85530e..ef9711004d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java @@ -132,7 +132,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener