summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-11-11 17:41:54 +0000
committerRobert Gemmell <robbie@apache.org>2011-11-11 17:41:54 +0000
commite0906fef00a14798d0ba72b44f8b03e8b4a60cb4 (patch)
tree2a4237389136496357c9667b61da748f6d6bb177
parent141737680ebf0680f74aabecd263f07106fe4cb6 (diff)
downloadqpid-python-e0906fef00a14798d0ba72b44f8b03e8b4a60cb4.tar.gz
QPID-3592: ensure that the 'used credit' values are decremented when message transfer commands 'completed' following a message.stop command attempt to restore their credit (to no effect due to the 0 credit limit) when using 0-10 Window credit mode. Add unit test, and break CreditManager dependency on ServerMessages by passing the required size value rather than the message itself.
Merged from trunk r1200801 (minus changes to python test excludes, which never existed on the 0.14 branch). git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.14@1200985 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java11
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java11
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java46
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java4
11 files changed, 42 insertions, 53 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
index c5f2d1e808..5a5ef5e6c5 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
@@ -1,6 +1,5 @@
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.message.ServerMessage;
import java.util.concurrent.atomic.AtomicLong;
@@ -59,9 +58,8 @@ public class BytesOnlyCreditManager extends AbstractFlowCreditManager
return _bytesCredit.get() > 0L;
}
- public boolean useCreditForMessage(ServerMessage msg)
+ public boolean useCreditForMessage(long msgSize)
{
- final long msgSize = msg.getSize();
if(hasCredit())
{
if(_bytesCredit.addAndGet(-msgSize) >= 0)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java
index b47f986155..c6771177ac 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.message.ServerMessage;
public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
{
@@ -118,7 +117,7 @@ public class CreditCreditManager extends AbstractFlowCreditManager implements Fl
return (_bytesCredit != 0L && _messageCredit != 0L);
}
- public synchronized boolean useCreditForMessage(final ServerMessage msg)
+ public synchronized boolean useCreditForMessage(long msgSize)
{
if(_messageCredit >= 0L)
{
@@ -130,10 +129,10 @@ public class CreditCreditManager extends AbstractFlowCreditManager implements Fl
return true;
}
- else if(msg.getSize() <= _bytesCredit)
+ else if(msgSize <= _bytesCredit)
{
_messageCredit--;
- _bytesCredit -= msg.getSize();
+ _bytesCredit -= msgSize;
return true;
}
@@ -151,9 +150,9 @@ public class CreditCreditManager extends AbstractFlowCreditManager implements Fl
}
else if(_bytesCredit >= 0L)
{
- if(msg.getSize() <= _bytesCredit)
+ if(msgSize <= _bytesCredit)
{
- _bytesCredit -= msg.getSize();
+ _bytesCredit -= msgSize;
return true;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
index bec51d361d..8a80262983 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
@@ -1,6 +1,5 @@
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.message.ServerMessage;
/*
*
@@ -41,6 +40,6 @@ public interface FlowCreditManager
public boolean hasCredit();
- public boolean useCreditForMessage(ServerMessage msg);
+ public boolean useCreditForMessage(long msgSize);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
index 901b71fd1f..6fcc687440 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
@@ -1,6 +1,5 @@
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.message.ServerMessage;
/*
*
@@ -47,7 +46,7 @@ public class LimitlessCreditManager extends AbstractFlowCreditManager implements
return true;
}
- public boolean useCreditForMessage(ServerMessage msg)
+ public boolean useCreditForMessage(long msgSize)
{
return true;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
index 19a9ac1d23..0e6ce70a60 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
@@ -1,6 +1,5 @@
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.message.ServerMessage;
/*
*
@@ -62,7 +61,7 @@ public class MessageAndBytesCreditManager extends AbstractFlowCreditManager impl
return (_messageCredit > 0L) && ( _bytesCredit > 0L );
}
- public synchronized boolean useCreditForMessage(ServerMessage msg)
+ public synchronized boolean useCreditForMessage(final long msgSize)
{
if(_messageCredit == 0L)
{
@@ -71,7 +70,6 @@ public class MessageAndBytesCreditManager extends AbstractFlowCreditManager impl
}
else
{
- final long msgSize = msg.getSize();
if(msgSize > _bytesCredit)
{
setSuspended(true);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
index a386f66b11..9b7c40e923 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
@@ -1,6 +1,5 @@
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.message.ServerMessage;
import java.util.concurrent.atomic.AtomicLong;
@@ -61,7 +60,7 @@ public class MessageOnlyCreditManager extends AbstractFlowCreditManager implemen
return _messageCredit.get() > 0L;
}
- public boolean useCreditForMessage(ServerMessage msg)
+ public boolean useCreditForMessage(long msgSize)
{
if(hasCredit())
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
index 026804439c..a193f8fae4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.message.ServerMessage;
public class Pre0_10CreditManager extends AbstractFlowCreditManager implements FlowCreditManager
{
@@ -133,7 +132,7 @@ public class Pre0_10CreditManager extends AbstractFlowCreditManager implements F
&& (_messageCreditLimit == 0L || _messageCredit > 0);
}
- public synchronized boolean useCreditForMessage(final ServerMessage msg)
+ public synchronized boolean useCreditForMessage(final long msgSize)
{
if(_messageCreditLimit != 0L)
{
@@ -147,10 +146,10 @@ public class Pre0_10CreditManager extends AbstractFlowCreditManager implements F
}
else
{
- if((_bytesCredit >= msg.getSize()) || (_bytesCredit == _bytesCreditLimit))
+ if((_bytesCredit >= msgSize) || (_bytesCredit == _bytesCreditLimit))
{
_messageCredit--;
- _bytesCredit -= msg.getSize();
+ _bytesCredit -= msgSize;
return true;
}
@@ -176,9 +175,9 @@ public class Pre0_10CreditManager extends AbstractFlowCreditManager implements F
}
else
{
- if((_bytesCredit >= msg.getSize()) || (_bytesCredit == _bytesCreditLimit))
+ if((_bytesCredit >= msgSize) || (_bytesCredit == _bytesCreditLimit))
{
- _bytesCredit -= msg.getSize();
+ _bytesCredit -= msgSize;
return true;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java
index 10f578551a..9623be595c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java
@@ -20,10 +20,12 @@
*/
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.message.ServerMessage;
+import org.apache.log4j.Logger;
public class WindowCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
{
+ private static final Logger LOGGER = Logger.getLogger(WindowCreditManager.class);
+
private volatile long _bytesCreditLimit;
private volatile long _messageCreditLimit;
@@ -70,31 +72,30 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl
public synchronized void restoreCredit(final long messageCredit, final long bytesCredit)
{
+ _messageUsed -= messageCredit;
+ if(_messageUsed < 0L)
+ {
+ LOGGER.error("Message credit used value was negative: "+ _messageUsed);
+ _messageUsed = 0;
+ }
+
boolean notifyIncrease = true;
+
if(_messageCreditLimit > 0L)
{
notifyIncrease = (_messageUsed != _messageCreditLimit);
- _messageUsed -= messageCredit;
-
- //TODO log warning
- if(_messageUsed < 0L)
- {
- _messageUsed = 0;
- }
}
-
+ _bytesUsed -= bytesCredit;
+ if(_bytesUsed < 0L)
+ {
+ LOGGER.error("Bytes credit used value was negative: "+ _messageUsed);
+ _bytesUsed = 0;
+ }
if(_bytesCreditLimit > 0L)
{
notifyIncrease = notifyIncrease && bytesCredit>0;
- _bytesUsed -= bytesCredit;
-
- //TODO log warning
- if(_bytesUsed < 0L)
- {
- _bytesUsed = 0;
- }
if(notifyIncrease)
{
@@ -102,10 +103,7 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl
}
}
-
-
setSuspended(!hasCredit());
-
}
@@ -116,7 +114,7 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl
&& (_messageCreditLimit < 0L || _messageCreditLimit > _messageUsed);
}
- public synchronized boolean useCreditForMessage(final ServerMessage msg)
+ public synchronized boolean useCreditForMessage(final long msgSize)
{
if(_messageCreditLimit >= 0L)
{
@@ -128,10 +126,10 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl
return true;
}
- else if(_bytesUsed + msg.getSize() <= _bytesCreditLimit)
+ else if(_bytesUsed + msgSize <= _bytesCreditLimit)
{
_messageUsed++;
- _bytesUsed += msg.getSize();
+ _bytesUsed += msgSize;
return true;
}
@@ -149,9 +147,9 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl
}
else if(_bytesCreditLimit >= 0L)
{
- if(_bytesUsed + msg.getSize() <= _bytesCreditLimit)
+ if(_bytesUsed + msgSize <= _bytesCreditLimit)
{
- _bytesUsed += msg.getSize();
+ _bytesUsed += msgSize;
return true;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
index 14ce85530e..ef9711004d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
@@ -132,7 +132,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
throws AMQException
{
- singleMessageCredit.useCreditForMessage(entry.getMessage());
+ singleMessageCredit.useCreditForMessage(entry.getMessage().getSize());
if(entry.getMessage() instanceof AMQMessage)
{
session.getProtocolOutputConverter().writeGetOk(entry, channel.getChannelId(),
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index bdf9c7119f..6603f58104 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -234,7 +234,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
public boolean wouldSuspend(QueueEntry msg)
{
- return !getCreditManager().useCreditForMessage(msg.getMessage());
+ return !getCreditManager().useCreditForMessage(msg.getMessage().getSize());
}
}
@@ -575,7 +575,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
public boolean wouldSuspend(QueueEntry msg)
{
- return !_creditManager.useCreditForMessage(msg.getMessage());//_channel.wouldSuspend(msg.getMessage());
+ return !_creditManager.useCreditForMessage(msg.getMessage().getSize());//_channel.wouldSuspend(msg.getMessage());
}
public void getSendLock()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index c5d6bc203c..9d52901fef 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -653,9 +653,9 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
_deleted.set(true);
}
- public boolean wouldSuspend(QueueEntry msg)
+ public boolean wouldSuspend(QueueEntry entry)
{
- return !_creditManager.useCreditForMessage(msg.getMessage());
+ return !_creditManager.useCreditForMessage(entry.getMessage().getSize());
}
public void getSendLock()