diff options
author | Robert Gemmell <robbie@apache.org> | 2011-11-11 17:41:54 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2011-11-11 17:41:54 +0000 |
commit | e0906fef00a14798d0ba72b44f8b03e8b4a60cb4 (patch) | |
tree | 2a4237389136496357c9667b61da748f6d6bb177 | |
parent | 141737680ebf0680f74aabecd263f07106fe4cb6 (diff) | |
download | qpid-python-e0906fef00a14798d0ba72b44f8b03e8b4a60cb4.tar.gz |
QPID-3592: ensure that the 'used credit' values are decremented when message transfer commands 'completed' following a message.stop command attempt to restore their credit (to no effect due to the 0 credit limit) when using 0-10 Window credit mode. Add unit test, and break CreditManager dependency on ServerMessages by passing the required size value rather than the message itself.
Merged from trunk r1200801 (minus changes to python test excludes, which never existed on the 0.14 branch).
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.14@1200985 13f79535-47bb-0310-9956-ffa450edef68
11 files changed, 42 insertions, 53 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java index c5f2d1e808..5a5ef5e6c5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java @@ -1,6 +1,5 @@ package org.apache.qpid.server.flow; -import org.apache.qpid.server.message.ServerMessage; import java.util.concurrent.atomic.AtomicLong; @@ -59,9 +58,8 @@ public class BytesOnlyCreditManager extends AbstractFlowCreditManager return _bytesCredit.get() > 0L; } - public boolean useCreditForMessage(ServerMessage msg) + public boolean useCreditForMessage(long msgSize) { - final long msgSize = msg.getSize(); if(hasCredit()) { if(_bytesCredit.addAndGet(-msgSize) >= 0) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java index b47f986155..c6771177ac 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.flow; -import org.apache.qpid.server.message.ServerMessage; public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10 { @@ -118,7 +117,7 @@ public class CreditCreditManager extends AbstractFlowCreditManager implements Fl return (_bytesCredit != 0L && _messageCredit != 0L); } - public synchronized boolean useCreditForMessage(final ServerMessage msg) + public synchronized boolean useCreditForMessage(long msgSize) { if(_messageCredit >= 0L) { @@ -130,10 +129,10 @@ public class CreditCreditManager extends AbstractFlowCreditManager implements Fl return true; } - else if(msg.getSize() <= _bytesCredit) + else if(msgSize <= _bytesCredit) { _messageCredit--; - _bytesCredit -= msg.getSize(); + _bytesCredit -= msgSize; return true; } @@ -151,9 +150,9 @@ public class CreditCreditManager extends AbstractFlowCreditManager implements Fl } else if(_bytesCredit >= 0L) { - if(msg.getSize() <= _bytesCredit) + if(msgSize <= _bytesCredit) { - _bytesCredit -= msg.getSize(); + _bytesCredit -= msgSize; return true; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java index bec51d361d..8a80262983 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java @@ -1,6 +1,5 @@ package org.apache.qpid.server.flow; -import org.apache.qpid.server.message.ServerMessage; /* * @@ -41,6 +40,6 @@ public interface FlowCreditManager public boolean hasCredit(); - public boolean useCreditForMessage(ServerMessage msg); + public boolean useCreditForMessage(long msgSize); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java index 901b71fd1f..6fcc687440 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java @@ -1,6 +1,5 @@ package org.apache.qpid.server.flow; -import org.apache.qpid.server.message.ServerMessage; /* * @@ -47,7 +46,7 @@ public class LimitlessCreditManager extends AbstractFlowCreditManager implements return true; } - public boolean useCreditForMessage(ServerMessage msg) + public boolean useCreditForMessage(long msgSize) { return true; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java index 19a9ac1d23..0e6ce70a60 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java @@ -1,6 +1,5 @@ package org.apache.qpid.server.flow; -import org.apache.qpid.server.message.ServerMessage; /* * @@ -62,7 +61,7 @@ public class MessageAndBytesCreditManager extends AbstractFlowCreditManager impl return (_messageCredit > 0L) && ( _bytesCredit > 0L ); } - public synchronized boolean useCreditForMessage(ServerMessage msg) + public synchronized boolean useCreditForMessage(final long msgSize) { if(_messageCredit == 0L) { @@ -71,7 +70,6 @@ public class MessageAndBytesCreditManager extends AbstractFlowCreditManager impl } else { - final long msgSize = msg.getSize(); if(msgSize > _bytesCredit) { setSuspended(true); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java index a386f66b11..9b7c40e923 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java @@ -1,6 +1,5 @@ package org.apache.qpid.server.flow; -import org.apache.qpid.server.message.ServerMessage; import java.util.concurrent.atomic.AtomicLong; @@ -61,7 +60,7 @@ public class MessageOnlyCreditManager extends AbstractFlowCreditManager implemen return _messageCredit.get() > 0L; } - public boolean useCreditForMessage(ServerMessage msg) + public boolean useCreditForMessage(long msgSize) { if(hasCredit()) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java index 026804439c..a193f8fae4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.flow; -import org.apache.qpid.server.message.ServerMessage; public class Pre0_10CreditManager extends AbstractFlowCreditManager implements FlowCreditManager { @@ -133,7 +132,7 @@ public class Pre0_10CreditManager extends AbstractFlowCreditManager implements F && (_messageCreditLimit == 0L || _messageCredit > 0); } - public synchronized boolean useCreditForMessage(final ServerMessage msg) + public synchronized boolean useCreditForMessage(final long msgSize) { if(_messageCreditLimit != 0L) { @@ -147,10 +146,10 @@ public class Pre0_10CreditManager extends AbstractFlowCreditManager implements F } else { - if((_bytesCredit >= msg.getSize()) || (_bytesCredit == _bytesCreditLimit)) + if((_bytesCredit >= msgSize) || (_bytesCredit == _bytesCreditLimit)) { _messageCredit--; - _bytesCredit -= msg.getSize(); + _bytesCredit -= msgSize; return true; } @@ -176,9 +175,9 @@ public class Pre0_10CreditManager extends AbstractFlowCreditManager implements F } else { - if((_bytesCredit >= msg.getSize()) || (_bytesCredit == _bytesCreditLimit)) + if((_bytesCredit >= msgSize) || (_bytesCredit == _bytesCreditLimit)) { - _bytesCredit -= msg.getSize(); + _bytesCredit -= msgSize; return true; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java index 10f578551a..9623be595c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java @@ -20,10 +20,12 @@ */ package org.apache.qpid.server.flow; -import org.apache.qpid.server.message.ServerMessage; +import org.apache.log4j.Logger; public class WindowCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10 { + private static final Logger LOGGER = Logger.getLogger(WindowCreditManager.class); + private volatile long _bytesCreditLimit; private volatile long _messageCreditLimit; @@ -70,31 +72,30 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl public synchronized void restoreCredit(final long messageCredit, final long bytesCredit) { + _messageUsed -= messageCredit; + if(_messageUsed < 0L) + { + LOGGER.error("Message credit used value was negative: "+ _messageUsed); + _messageUsed = 0; + } + boolean notifyIncrease = true; + if(_messageCreditLimit > 0L) { notifyIncrease = (_messageUsed != _messageCreditLimit); - _messageUsed -= messageCredit; - - //TODO log warning - if(_messageUsed < 0L) - { - _messageUsed = 0; - } } - + _bytesUsed -= bytesCredit; + if(_bytesUsed < 0L) + { + LOGGER.error("Bytes credit used value was negative: "+ _messageUsed); + _bytesUsed = 0; + } if(_bytesCreditLimit > 0L) { notifyIncrease = notifyIncrease && bytesCredit>0; - _bytesUsed -= bytesCredit; - - //TODO log warning - if(_bytesUsed < 0L) - { - _bytesUsed = 0; - } if(notifyIncrease) { @@ -102,10 +103,7 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl } } - - setSuspended(!hasCredit()); - } @@ -116,7 +114,7 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl && (_messageCreditLimit < 0L || _messageCreditLimit > _messageUsed); } - public synchronized boolean useCreditForMessage(final ServerMessage msg) + public synchronized boolean useCreditForMessage(final long msgSize) { if(_messageCreditLimit >= 0L) { @@ -128,10 +126,10 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl return true; } - else if(_bytesUsed + msg.getSize() <= _bytesCreditLimit) + else if(_bytesUsed + msgSize <= _bytesCreditLimit) { _messageUsed++; - _bytesUsed += msg.getSize(); + _bytesUsed += msgSize; return true; } @@ -149,9 +147,9 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl } else if(_bytesCreditLimit >= 0L) { - if(_bytesUsed + msg.getSize() <= _bytesCreditLimit) + if(_bytesUsed + msgSize <= _bytesCreditLimit) { - _bytesUsed += msg.getSize(); + _bytesUsed += msgSize; return true; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java index 14ce85530e..ef9711004d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java @@ -132,7 +132,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag) throws AMQException { - singleMessageCredit.useCreditForMessage(entry.getMessage()); + singleMessageCredit.useCreditForMessage(entry.getMessage().getSize()); if(entry.getMessage() instanceof AMQMessage) { session.getProtocolOutputConverter().writeGetOk(entry, channel.getChannelId(), diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index bdf9c7119f..6603f58104 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -234,7 +234,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage public boolean wouldSuspend(QueueEntry msg) { - return !getCreditManager().useCreditForMessage(msg.getMessage()); + return !getCreditManager().useCreditForMessage(msg.getMessage().getSize()); } } @@ -575,7 +575,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage public boolean wouldSuspend(QueueEntry msg) { - return !_creditManager.useCreditForMessage(msg.getMessage());//_channel.wouldSuspend(msg.getMessage()); + return !_creditManager.useCreditForMessage(msg.getMessage().getSize());//_channel.wouldSuspend(msg.getMessage()); } public void getSendLock() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index c5d6bc203c..9d52901fef 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -653,9 +653,9 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr _deleted.set(true); } - public boolean wouldSuspend(QueueEntry msg) + public boolean wouldSuspend(QueueEntry entry) { - return !_creditManager.useCreditForMessage(msg.getMessage()); + return !_creditManager.useCreditForMessage(entry.getMessage().getSize()); } public void getSendLock() |