diff options
author | Robert Gemmell <robbie@apache.org> | 2011-11-11 10:16:47 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2011-11-11 10:16:47 +0000 |
commit | a1f730067b3be09080327ac05312eeb6da47297b (patch) | |
tree | 067c28d9b775ca11c3409d56427dc926bc356323 | |
parent | 9da016b7e810e054bb2b0cf21bf1ad5573c97d4b (diff) | |
download | qpid-python-a1f730067b3be09080327ac05312eeb6da47297b.tar.gz |
QPID-3592: ensure that the 'used credit' values are decremented when message transfer commands 'completed' following a message.stop command attempt to restore their credit (to no effect due to the 0 credit limit) when using 0-10 Window credit mode. Add unit test, and break CreditManager dependency on ServerMessages by passing the required size value rather than the message itself.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1200801 13f79535-47bb-0310-9956-ffa450edef68
13 files changed, 105 insertions, 56 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java b/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java index c5f2d1e808..5a5ef5e6c5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java @@ -1,6 +1,5 @@ package org.apache.qpid.server.flow; -import org.apache.qpid.server.message.ServerMessage; import java.util.concurrent.atomic.AtomicLong; @@ -59,9 +58,8 @@ public class BytesOnlyCreditManager extends AbstractFlowCreditManager return _bytesCredit.get() > 0L; } - public boolean useCreditForMessage(ServerMessage msg) + public boolean useCreditForMessage(long msgSize) { - final long msgSize = msg.getSize(); if(hasCredit()) { if(_bytesCredit.addAndGet(-msgSize) >= 0) diff --git a/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java b/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java index b47f986155..c6771177ac 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.flow; -import org.apache.qpid.server.message.ServerMessage; public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10 { @@ -118,7 +117,7 @@ public class CreditCreditManager extends AbstractFlowCreditManager implements Fl return (_bytesCredit != 0L && _messageCredit != 0L); } - public synchronized boolean useCreditForMessage(final ServerMessage msg) + public synchronized boolean useCreditForMessage(long msgSize) { if(_messageCredit >= 0L) { @@ -130,10 +129,10 @@ public class CreditCreditManager extends AbstractFlowCreditManager implements Fl return true; } - else if(msg.getSize() <= _bytesCredit) + else if(msgSize <= _bytesCredit) { _messageCredit--; - _bytesCredit -= msg.getSize(); + _bytesCredit -= msgSize; return true; } @@ -151,9 +150,9 @@ public class CreditCreditManager extends AbstractFlowCreditManager implements Fl } else if(_bytesCredit >= 0L) { - if(msg.getSize() <= _bytesCredit) + if(msgSize <= _bytesCredit) { - _bytesCredit -= msg.getSize(); + _bytesCredit -= msgSize; return true; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java b/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java index bec51d361d..8a80262983 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java @@ -1,6 +1,5 @@ package org.apache.qpid.server.flow; -import org.apache.qpid.server.message.ServerMessage; /* * @@ -41,6 +40,6 @@ public interface FlowCreditManager public boolean hasCredit(); - public boolean useCreditForMessage(ServerMessage msg); + public boolean useCreditForMessage(long msgSize); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java b/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java index 901b71fd1f..6fcc687440 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java @@ -1,6 +1,5 @@ package org.apache.qpid.server.flow; -import org.apache.qpid.server.message.ServerMessage; /* * @@ -47,7 +46,7 @@ public class LimitlessCreditManager extends AbstractFlowCreditManager implements return true; } - public boolean useCreditForMessage(ServerMessage msg) + public boolean useCreditForMessage(long msgSize) { return true; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java b/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java index 19a9ac1d23..0e6ce70a60 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java @@ -1,6 +1,5 @@ package org.apache.qpid.server.flow; -import org.apache.qpid.server.message.ServerMessage; /* * @@ -62,7 +61,7 @@ public class MessageAndBytesCreditManager extends AbstractFlowCreditManager impl return (_messageCredit > 0L) && ( _bytesCredit > 0L ); } - public synchronized boolean useCreditForMessage(ServerMessage msg) + public synchronized boolean useCreditForMessage(final long msgSize) { if(_messageCredit == 0L) { @@ -71,7 +70,6 @@ public class MessageAndBytesCreditManager extends AbstractFlowCreditManager impl } else { - final long msgSize = msg.getSize(); if(msgSize > _bytesCredit) { setSuspended(true); diff --git a/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java b/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java index a386f66b11..9b7c40e923 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java @@ -1,6 +1,5 @@ package org.apache.qpid.server.flow; -import org.apache.qpid.server.message.ServerMessage; import java.util.concurrent.atomic.AtomicLong; @@ -61,7 +60,7 @@ public class MessageOnlyCreditManager extends AbstractFlowCreditManager implemen return _messageCredit.get() > 0L; } - public boolean useCreditForMessage(ServerMessage msg) + public boolean useCreditForMessage(long msgSize) { if(hasCredit()) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java b/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java index 026804439c..a193f8fae4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.flow; -import org.apache.qpid.server.message.ServerMessage; public class Pre0_10CreditManager extends AbstractFlowCreditManager implements FlowCreditManager { @@ -133,7 +132,7 @@ public class Pre0_10CreditManager extends AbstractFlowCreditManager implements F && (_messageCreditLimit == 0L || _messageCredit > 0); } - public synchronized boolean useCreditForMessage(final ServerMessage msg) + public synchronized boolean useCreditForMessage(final long msgSize) { if(_messageCreditLimit != 0L) { @@ -147,10 +146,10 @@ public class Pre0_10CreditManager extends AbstractFlowCreditManager implements F } else { - if((_bytesCredit >= msg.getSize()) || (_bytesCredit == _bytesCreditLimit)) + if((_bytesCredit >= msgSize) || (_bytesCredit == _bytesCreditLimit)) { _messageCredit--; - _bytesCredit -= msg.getSize(); + _bytesCredit -= msgSize; return true; } @@ -176,9 +175,9 @@ public class Pre0_10CreditManager extends AbstractFlowCreditManager implements F } else { - if((_bytesCredit >= msg.getSize()) || (_bytesCredit == _bytesCreditLimit)) + if((_bytesCredit >= msgSize) || (_bytesCredit == _bytesCreditLimit)) { - _bytesCredit -= msg.getSize(); + _bytesCredit -= msgSize; return true; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java b/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java index 10f578551a..9623be595c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java @@ -20,10 +20,12 @@ */ package org.apache.qpid.server.flow; -import org.apache.qpid.server.message.ServerMessage; +import org.apache.log4j.Logger; public class WindowCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10 { + private static final Logger LOGGER = Logger.getLogger(WindowCreditManager.class); + private volatile long _bytesCreditLimit; private volatile long _messageCreditLimit; @@ -70,31 +72,30 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl public synchronized void restoreCredit(final long messageCredit, final long bytesCredit) { + _messageUsed -= messageCredit; + if(_messageUsed < 0L) + { + LOGGER.error("Message credit used value was negative: "+ _messageUsed); + _messageUsed = 0; + } + boolean notifyIncrease = true; + if(_messageCreditLimit > 0L) { notifyIncrease = (_messageUsed != _messageCreditLimit); - _messageUsed -= messageCredit; - - //TODO log warning - if(_messageUsed < 0L) - { - _messageUsed = 0; - } } - + _bytesUsed -= bytesCredit; + if(_bytesUsed < 0L) + { + LOGGER.error("Bytes credit used value was negative: "+ _messageUsed); + _bytesUsed = 0; + } if(_bytesCreditLimit > 0L) { notifyIncrease = notifyIncrease && bytesCredit>0; - _bytesUsed -= bytesCredit; - - //TODO log warning - if(_bytesUsed < 0L) - { - _bytesUsed = 0; - } if(notifyIncrease) { @@ -102,10 +103,7 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl } } - - setSuspended(!hasCredit()); - } @@ -116,7 +114,7 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl && (_messageCreditLimit < 0L || _messageCreditLimit > _messageUsed); } - public synchronized boolean useCreditForMessage(final ServerMessage msg) + public synchronized boolean useCreditForMessage(final long msgSize) { if(_messageCreditLimit >= 0L) { @@ -128,10 +126,10 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl return true; } - else if(_bytesUsed + msg.getSize() <= _bytesCreditLimit) + else if(_bytesUsed + msgSize <= _bytesCreditLimit) { _messageUsed++; - _bytesUsed += msg.getSize(); + _bytesUsed += msgSize; return true; } @@ -149,9 +147,9 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl } else if(_bytesCreditLimit >= 0L) { - if(_bytesUsed + msg.getSize() <= _bytesCreditLimit) + if(_bytesUsed + msgSize <= _bytesCreditLimit) { - _bytesUsed += msg.getSize(); + _bytesUsed += msgSize; return true; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java index 14ce85530e..ef9711004d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java @@ -132,7 +132,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag) throws AMQException { - singleMessageCredit.useCreditForMessage(entry.getMessage()); + singleMessageCredit.useCreditForMessage(entry.getMessage().getSize()); if(entry.getMessage() instanceof AMQMessage) { session.getProtocolOutputConverter().writeGetOk(entry, channel.getChannelId(), diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index bdf9c7119f..6603f58104 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -234,7 +234,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage public boolean wouldSuspend(QueueEntry msg) { - return !getCreditManager().useCreditForMessage(msg.getMessage()); + return !getCreditManager().useCreditForMessage(msg.getMessage().getSize()); } } @@ -575,7 +575,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage public boolean wouldSuspend(QueueEntry msg) { - return !_creditManager.useCreditForMessage(msg.getMessage());//_channel.wouldSuspend(msg.getMessage()); + return !_creditManager.useCreditForMessage(msg.getMessage().getSize());//_channel.wouldSuspend(msg.getMessage()); } public void getSendLock() diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index 6598d07a84..eebd50b0b4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -654,9 +654,9 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr _deleted.set(true); } - public boolean wouldSuspend(QueueEntry msg) + public boolean wouldSuspend(QueueEntry entry) { - return !_creditManager.useCreditForMessage(msg.getMessage()); + return !_creditManager.useCreditForMessage(entry.getMessage().getSize()); } public void getSendLock() diff --git a/java/broker/src/test/java/org/apache/qpid/server/flow/WindowCreditManagerTest.java b/java/broker/src/test/java/org/apache/qpid/server/flow/WindowCreditManagerTest.java new file mode 100644 index 0000000000..61a9e0b446 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/flow/WindowCreditManagerTest.java @@ -0,0 +1,63 @@ +package org.apache.qpid.server.flow; + +import org.apache.qpid.test.utils.QpidTestCase; + +public class WindowCreditManagerTest extends QpidTestCase +{ + WindowCreditManager _creditManager; + + protected void setUp() throws Exception + { + super.setUp(); + _creditManager = new WindowCreditManager(); + } + + /** + * Tests that after the credit limit is cleared (e.g. from a message.stop command), credit is + * restored (e.g. from completed MessageTransfer) without increasing the available credit, and + * more credit is added, that the 'used' count is correct and the proper values for bytes + * and message credit are returned along with appropriate 'hasCredit' results (QPID-3592). + */ + public void testRestoreCreditDecrementsUsedCountAfterCreditClear() + { + assertEquals("unexpected credit value", 0, _creditManager.getMessageCredit()); + assertEquals("unexpected credit value", 0, _creditManager.getBytesCredit()); + + //give some message credit + _creditManager.addCredit(1, 0); + assertFalse("Manager should not 'haveCredit' due to having 0 bytes credit", _creditManager.hasCredit()); + assertEquals("unexpected credit value", 1, _creditManager.getMessageCredit()); + assertEquals("unexpected credit value", 0, _creditManager.getBytesCredit()); + + //give some bytes credit + _creditManager.addCredit(0, 1); + assertTrue("Manager should 'haveCredit'", _creditManager.hasCredit()); + assertEquals("unexpected credit value", 1, _creditManager.getMessageCredit()); + assertEquals("unexpected credit value", 1, _creditManager.getBytesCredit()); + + //use all the credit + _creditManager.useCreditForMessage(1); + assertEquals("unexpected credit value", 0, _creditManager.getBytesCredit()); + assertEquals("unexpected credit value", 0, _creditManager.getMessageCredit()); + assertFalse("Manager should not 'haveCredit'", _creditManager.hasCredit()); + + //clear credit out (eg from a message.stop command) + _creditManager.clearCredit(); + assertEquals("unexpected credit value", 0, _creditManager.getBytesCredit()); + assertEquals("unexpected credit value", 0, _creditManager.getMessageCredit()); + assertFalse("Manager should not 'haveCredit'", _creditManager.hasCredit()); + + //restore credit (e.g the original message transfer command got completed) + //this should not increase credit, because it is now limited to 0 + _creditManager.restoreCredit(1, 1); + assertEquals("unexpected credit value", 0, _creditManager.getBytesCredit()); + assertEquals("unexpected credit value", 0, _creditManager.getMessageCredit()); + assertFalse("Manager should not 'haveCredit'", _creditManager.hasCredit()); + + //give more credit to open the window again + _creditManager.addCredit(1, 1); + assertEquals("unexpected credit value", 1, _creditManager.getBytesCredit()); + assertEquals("unexpected credit value", 1, _creditManager.getMessageCredit()); + assertTrue("Manager should 'haveCredit'", _creditManager.hasCredit()); + } +} diff --git a/java/test-profiles/python_tests/Java010PythonExcludes b/java/test-profiles/python_tests/Java010PythonExcludes index 7d9782d0ff..2ba94d644a 100644 --- a/java/test-profiles/python_tests/Java010PythonExcludes +++ b/java/test-profiles/python_tests/Java010PythonExcludes @@ -66,9 +66,6 @@ qpid.tests.messaging.endpoints.SessionTests.testDoubleCommit qpid_tests.broker_0_10.message.MessageTests.test_credit_flow_bytes qpid_tests.broker_0_10.message.MessageTests.test_window_flow_bytes -#QPID-3592 Fails to receive more messages after restart -qpid_tests.broker_0_10.message.MessageTests.test_window_stop - #QPID-3605 Durable subscriber with no-local true receives messages on re-connection qpid_tests.broker_0_10.message.MessageTests.test_no_local_awkward |