summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-11-11 10:16:47 +0000
committerRobert Gemmell <robbie@apache.org>2011-11-11 10:16:47 +0000
commita1f730067b3be09080327ac05312eeb6da47297b (patch)
tree067c28d9b775ca11c3409d56427dc926bc356323
parent9da016b7e810e054bb2b0cf21bf1ad5573c97d4b (diff)
downloadqpid-python-a1f730067b3be09080327ac05312eeb6da47297b.tar.gz
QPID-3592: ensure that the 'used credit' values are decremented when message transfer commands 'completed' following a message.stop command attempt to restore their credit (to no effect due to the 0 credit limit) when using 0-10 Window credit mode. Add unit test, and break CreditManager dependency on ServerMessages by passing the required size value rather than the message itself.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1200801 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java46
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java4
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/flow/WindowCreditManagerTest.java63
-rw-r--r--java/test-profiles/python_tests/Java010PythonExcludes3
13 files changed, 105 insertions, 56 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java b/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
index c5f2d1e808..5a5ef5e6c5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
@@ -1,6 +1,5 @@
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.message.ServerMessage;
import java.util.concurrent.atomic.AtomicLong;
@@ -59,9 +58,8 @@ public class BytesOnlyCreditManager extends AbstractFlowCreditManager
return _bytesCredit.get() > 0L;
}
- public boolean useCreditForMessage(ServerMessage msg)
+ public boolean useCreditForMessage(long msgSize)
{
- final long msgSize = msg.getSize();
if(hasCredit())
{
if(_bytesCredit.addAndGet(-msgSize) >= 0)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java b/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java
index b47f986155..c6771177ac 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.message.ServerMessage;
public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
{
@@ -118,7 +117,7 @@ public class CreditCreditManager extends AbstractFlowCreditManager implements Fl
return (_bytesCredit != 0L && _messageCredit != 0L);
}
- public synchronized boolean useCreditForMessage(final ServerMessage msg)
+ public synchronized boolean useCreditForMessage(long msgSize)
{
if(_messageCredit >= 0L)
{
@@ -130,10 +129,10 @@ public class CreditCreditManager extends AbstractFlowCreditManager implements Fl
return true;
}
- else if(msg.getSize() <= _bytesCredit)
+ else if(msgSize <= _bytesCredit)
{
_messageCredit--;
- _bytesCredit -= msg.getSize();
+ _bytesCredit -= msgSize;
return true;
}
@@ -151,9 +150,9 @@ public class CreditCreditManager extends AbstractFlowCreditManager implements Fl
}
else if(_bytesCredit >= 0L)
{
- if(msg.getSize() <= _bytesCredit)
+ if(msgSize <= _bytesCredit)
{
- _bytesCredit -= msg.getSize();
+ _bytesCredit -= msgSize;
return true;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java b/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
index bec51d361d..8a80262983 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
@@ -1,6 +1,5 @@
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.message.ServerMessage;
/*
*
@@ -41,6 +40,6 @@ public interface FlowCreditManager
public boolean hasCredit();
- public boolean useCreditForMessage(ServerMessage msg);
+ public boolean useCreditForMessage(long msgSize);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java b/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
index 901b71fd1f..6fcc687440 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
@@ -1,6 +1,5 @@
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.message.ServerMessage;
/*
*
@@ -47,7 +46,7 @@ public class LimitlessCreditManager extends AbstractFlowCreditManager implements
return true;
}
- public boolean useCreditForMessage(ServerMessage msg)
+ public boolean useCreditForMessage(long msgSize)
{
return true;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java b/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
index 19a9ac1d23..0e6ce70a60 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
@@ -1,6 +1,5 @@
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.message.ServerMessage;
/*
*
@@ -62,7 +61,7 @@ public class MessageAndBytesCreditManager extends AbstractFlowCreditManager impl
return (_messageCredit > 0L) && ( _bytesCredit > 0L );
}
- public synchronized boolean useCreditForMessage(ServerMessage msg)
+ public synchronized boolean useCreditForMessage(final long msgSize)
{
if(_messageCredit == 0L)
{
@@ -71,7 +70,6 @@ public class MessageAndBytesCreditManager extends AbstractFlowCreditManager impl
}
else
{
- final long msgSize = msg.getSize();
if(msgSize > _bytesCredit)
{
setSuspended(true);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java b/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
index a386f66b11..9b7c40e923 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
@@ -1,6 +1,5 @@
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.message.ServerMessage;
import java.util.concurrent.atomic.AtomicLong;
@@ -61,7 +60,7 @@ public class MessageOnlyCreditManager extends AbstractFlowCreditManager implemen
return _messageCredit.get() > 0L;
}
- public boolean useCreditForMessage(ServerMessage msg)
+ public boolean useCreditForMessage(long msgSize)
{
if(hasCredit())
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java b/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
index 026804439c..a193f8fae4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.message.ServerMessage;
public class Pre0_10CreditManager extends AbstractFlowCreditManager implements FlowCreditManager
{
@@ -133,7 +132,7 @@ public class Pre0_10CreditManager extends AbstractFlowCreditManager implements F
&& (_messageCreditLimit == 0L || _messageCredit > 0);
}
- public synchronized boolean useCreditForMessage(final ServerMessage msg)
+ public synchronized boolean useCreditForMessage(final long msgSize)
{
if(_messageCreditLimit != 0L)
{
@@ -147,10 +146,10 @@ public class Pre0_10CreditManager extends AbstractFlowCreditManager implements F
}
else
{
- if((_bytesCredit >= msg.getSize()) || (_bytesCredit == _bytesCreditLimit))
+ if((_bytesCredit >= msgSize) || (_bytesCredit == _bytesCreditLimit))
{
_messageCredit--;
- _bytesCredit -= msg.getSize();
+ _bytesCredit -= msgSize;
return true;
}
@@ -176,9 +175,9 @@ public class Pre0_10CreditManager extends AbstractFlowCreditManager implements F
}
else
{
- if((_bytesCredit >= msg.getSize()) || (_bytesCredit == _bytesCreditLimit))
+ if((_bytesCredit >= msgSize) || (_bytesCredit == _bytesCreditLimit))
{
- _bytesCredit -= msg.getSize();
+ _bytesCredit -= msgSize;
return true;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java b/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java
index 10f578551a..9623be595c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java
@@ -20,10 +20,12 @@
*/
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.message.ServerMessage;
+import org.apache.log4j.Logger;
public class WindowCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
{
+ private static final Logger LOGGER = Logger.getLogger(WindowCreditManager.class);
+
private volatile long _bytesCreditLimit;
private volatile long _messageCreditLimit;
@@ -70,31 +72,30 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl
public synchronized void restoreCredit(final long messageCredit, final long bytesCredit)
{
+ _messageUsed -= messageCredit;
+ if(_messageUsed < 0L)
+ {
+ LOGGER.error("Message credit used value was negative: "+ _messageUsed);
+ _messageUsed = 0;
+ }
+
boolean notifyIncrease = true;
+
if(_messageCreditLimit > 0L)
{
notifyIncrease = (_messageUsed != _messageCreditLimit);
- _messageUsed -= messageCredit;
-
- //TODO log warning
- if(_messageUsed < 0L)
- {
- _messageUsed = 0;
- }
}
-
+ _bytesUsed -= bytesCredit;
+ if(_bytesUsed < 0L)
+ {
+ LOGGER.error("Bytes credit used value was negative: "+ _messageUsed);
+ _bytesUsed = 0;
+ }
if(_bytesCreditLimit > 0L)
{
notifyIncrease = notifyIncrease && bytesCredit>0;
- _bytesUsed -= bytesCredit;
-
- //TODO log warning
- if(_bytesUsed < 0L)
- {
- _bytesUsed = 0;
- }
if(notifyIncrease)
{
@@ -102,10 +103,7 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl
}
}
-
-
setSuspended(!hasCredit());
-
}
@@ -116,7 +114,7 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl
&& (_messageCreditLimit < 0L || _messageCreditLimit > _messageUsed);
}
- public synchronized boolean useCreditForMessage(final ServerMessage msg)
+ public synchronized boolean useCreditForMessage(final long msgSize)
{
if(_messageCreditLimit >= 0L)
{
@@ -128,10 +126,10 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl
return true;
}
- else if(_bytesUsed + msg.getSize() <= _bytesCreditLimit)
+ else if(_bytesUsed + msgSize <= _bytesCreditLimit)
{
_messageUsed++;
- _bytesUsed += msg.getSize();
+ _bytesUsed += msgSize;
return true;
}
@@ -149,9 +147,9 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl
}
else if(_bytesCreditLimit >= 0L)
{
- if(_bytesUsed + msg.getSize() <= _bytesCreditLimit)
+ if(_bytesUsed + msgSize <= _bytesCreditLimit)
{
- _bytesUsed += msg.getSize();
+ _bytesUsed += msgSize;
return true;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
index 14ce85530e..ef9711004d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
@@ -132,7 +132,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
throws AMQException
{
- singleMessageCredit.useCreditForMessage(entry.getMessage());
+ singleMessageCredit.useCreditForMessage(entry.getMessage().getSize());
if(entry.getMessage() instanceof AMQMessage)
{
session.getProtocolOutputConverter().writeGetOk(entry, channel.getChannelId(),
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index bdf9c7119f..6603f58104 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -234,7 +234,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
public boolean wouldSuspend(QueueEntry msg)
{
- return !getCreditManager().useCreditForMessage(msg.getMessage());
+ return !getCreditManager().useCreditForMessage(msg.getMessage().getSize());
}
}
@@ -575,7 +575,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
public boolean wouldSuspend(QueueEntry msg)
{
- return !_creditManager.useCreditForMessage(msg.getMessage());//_channel.wouldSuspend(msg.getMessage());
+ return !_creditManager.useCreditForMessage(msg.getMessage().getSize());//_channel.wouldSuspend(msg.getMessage());
}
public void getSendLock()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index 6598d07a84..eebd50b0b4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -654,9 +654,9 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
_deleted.set(true);
}
- public boolean wouldSuspend(QueueEntry msg)
+ public boolean wouldSuspend(QueueEntry entry)
{
- return !_creditManager.useCreditForMessage(msg.getMessage());
+ return !_creditManager.useCreditForMessage(entry.getMessage().getSize());
}
public void getSendLock()
diff --git a/java/broker/src/test/java/org/apache/qpid/server/flow/WindowCreditManagerTest.java b/java/broker/src/test/java/org/apache/qpid/server/flow/WindowCreditManagerTest.java
new file mode 100644
index 0000000000..61a9e0b446
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/flow/WindowCreditManagerTest.java
@@ -0,0 +1,63 @@
+package org.apache.qpid.server.flow;
+
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class WindowCreditManagerTest extends QpidTestCase
+{
+ WindowCreditManager _creditManager;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _creditManager = new WindowCreditManager();
+ }
+
+ /**
+ * Tests that after the credit limit is cleared (e.g. from a message.stop command), credit is
+ * restored (e.g. from completed MessageTransfer) without increasing the available credit, and
+ * more credit is added, that the 'used' count is correct and the proper values for bytes
+ * and message credit are returned along with appropriate 'hasCredit' results (QPID-3592).
+ */
+ public void testRestoreCreditDecrementsUsedCountAfterCreditClear()
+ {
+ assertEquals("unexpected credit value", 0, _creditManager.getMessageCredit());
+ assertEquals("unexpected credit value", 0, _creditManager.getBytesCredit());
+
+ //give some message credit
+ _creditManager.addCredit(1, 0);
+ assertFalse("Manager should not 'haveCredit' due to having 0 bytes credit", _creditManager.hasCredit());
+ assertEquals("unexpected credit value", 1, _creditManager.getMessageCredit());
+ assertEquals("unexpected credit value", 0, _creditManager.getBytesCredit());
+
+ //give some bytes credit
+ _creditManager.addCredit(0, 1);
+ assertTrue("Manager should 'haveCredit'", _creditManager.hasCredit());
+ assertEquals("unexpected credit value", 1, _creditManager.getMessageCredit());
+ assertEquals("unexpected credit value", 1, _creditManager.getBytesCredit());
+
+ //use all the credit
+ _creditManager.useCreditForMessage(1);
+ assertEquals("unexpected credit value", 0, _creditManager.getBytesCredit());
+ assertEquals("unexpected credit value", 0, _creditManager.getMessageCredit());
+ assertFalse("Manager should not 'haveCredit'", _creditManager.hasCredit());
+
+ //clear credit out (eg from a message.stop command)
+ _creditManager.clearCredit();
+ assertEquals("unexpected credit value", 0, _creditManager.getBytesCredit());
+ assertEquals("unexpected credit value", 0, _creditManager.getMessageCredit());
+ assertFalse("Manager should not 'haveCredit'", _creditManager.hasCredit());
+
+ //restore credit (e.g the original message transfer command got completed)
+ //this should not increase credit, because it is now limited to 0
+ _creditManager.restoreCredit(1, 1);
+ assertEquals("unexpected credit value", 0, _creditManager.getBytesCredit());
+ assertEquals("unexpected credit value", 0, _creditManager.getMessageCredit());
+ assertFalse("Manager should not 'haveCredit'", _creditManager.hasCredit());
+
+ //give more credit to open the window again
+ _creditManager.addCredit(1, 1);
+ assertEquals("unexpected credit value", 1, _creditManager.getBytesCredit());
+ assertEquals("unexpected credit value", 1, _creditManager.getMessageCredit());
+ assertTrue("Manager should 'haveCredit'", _creditManager.hasCredit());
+ }
+}
diff --git a/java/test-profiles/python_tests/Java010PythonExcludes b/java/test-profiles/python_tests/Java010PythonExcludes
index 7d9782d0ff..2ba94d644a 100644
--- a/java/test-profiles/python_tests/Java010PythonExcludes
+++ b/java/test-profiles/python_tests/Java010PythonExcludes
@@ -66,9 +66,6 @@ qpid.tests.messaging.endpoints.SessionTests.testDoubleCommit
qpid_tests.broker_0_10.message.MessageTests.test_credit_flow_bytes
qpid_tests.broker_0_10.message.MessageTests.test_window_flow_bytes
-#QPID-3592 Fails to receive more messages after restart
-qpid_tests.broker_0_10.message.MessageTests.test_window_stop
-
#QPID-3605 Durable subscriber with no-local true receives messages on re-connection
qpid_tests.broker_0_10.message.MessageTests.test_no_local_awkward