summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java11
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java11
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java46
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java4
11 files changed, 42 insertions, 53 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
index c5f2d1e808..5a5ef5e6c5 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
@@ -1,6 +1,5 @@
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.message.ServerMessage;
import java.util.concurrent.atomic.AtomicLong;
@@ -59,9 +58,8 @@ public class BytesOnlyCreditManager extends AbstractFlowCreditManager
return _bytesCredit.get() > 0L;
}
- public boolean useCreditForMessage(ServerMessage msg)
+ public boolean useCreditForMessage(long msgSize)
{
- final long msgSize = msg.getSize();
if(hasCredit())
{
if(_bytesCredit.addAndGet(-msgSize) >= 0)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java
index b47f986155..c6771177ac 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.message.ServerMessage;
public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
{
@@ -118,7 +117,7 @@ public class CreditCreditManager extends AbstractFlowCreditManager implements Fl
return (_bytesCredit != 0L && _messageCredit != 0L);
}
- public synchronized boolean useCreditForMessage(final ServerMessage msg)
+ public synchronized boolean useCreditForMessage(long msgSize)
{
if(_messageCredit >= 0L)
{
@@ -130,10 +129,10 @@ public class CreditCreditManager extends AbstractFlowCreditManager implements Fl
return true;
}
- else if(msg.getSize() <= _bytesCredit)
+ else if(msgSize <= _bytesCredit)
{
_messageCredit--;
- _bytesCredit -= msg.getSize();
+ _bytesCredit -= msgSize;
return true;
}
@@ -151,9 +150,9 @@ public class CreditCreditManager extends AbstractFlowCreditManager implements Fl
}
else if(_bytesCredit >= 0L)
{
- if(msg.getSize() <= _bytesCredit)
+ if(msgSize <= _bytesCredit)
{
- _bytesCredit -= msg.getSize();
+ _bytesCredit -= msgSize;
return true;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
index bec51d361d..8a80262983 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
@@ -1,6 +1,5 @@
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.message.ServerMessage;
/*
*
@@ -41,6 +40,6 @@ public interface FlowCreditManager
public boolean hasCredit();
- public boolean useCreditForMessage(ServerMessage msg);
+ public boolean useCreditForMessage(long msgSize);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
index 901b71fd1f..6fcc687440 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
@@ -1,6 +1,5 @@
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.message.ServerMessage;
/*
*
@@ -47,7 +46,7 @@ public class LimitlessCreditManager extends AbstractFlowCreditManager implements
return true;
}
- public boolean useCreditForMessage(ServerMessage msg)
+ public boolean useCreditForMessage(long msgSize)
{
return true;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
index 19a9ac1d23..0e6ce70a60 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
@@ -1,6 +1,5 @@
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.message.ServerMessage;
/*
*
@@ -62,7 +61,7 @@ public class MessageAndBytesCreditManager extends AbstractFlowCreditManager impl
return (_messageCredit > 0L) && ( _bytesCredit > 0L );
}
- public synchronized boolean useCreditForMessage(ServerMessage msg)
+ public synchronized boolean useCreditForMessage(final long msgSize)
{
if(_messageCredit == 0L)
{
@@ -71,7 +70,6 @@ public class MessageAndBytesCreditManager extends AbstractFlowCreditManager impl
}
else
{
- final long msgSize = msg.getSize();
if(msgSize > _bytesCredit)
{
setSuspended(true);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
index a386f66b11..9b7c40e923 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
@@ -1,6 +1,5 @@
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.message.ServerMessage;
import java.util.concurrent.atomic.AtomicLong;
@@ -61,7 +60,7 @@ public class MessageOnlyCreditManager extends AbstractFlowCreditManager implemen
return _messageCredit.get() > 0L;
}
- public boolean useCreditForMessage(ServerMessage msg)
+ public boolean useCreditForMessage(long msgSize)
{
if(hasCredit())
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
index 026804439c..a193f8fae4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.message.ServerMessage;
public class Pre0_10CreditManager extends AbstractFlowCreditManager implements FlowCreditManager
{
@@ -133,7 +132,7 @@ public class Pre0_10CreditManager extends AbstractFlowCreditManager implements F
&& (_messageCreditLimit == 0L || _messageCredit > 0);
}
- public synchronized boolean useCreditForMessage(final ServerMessage msg)
+ public synchronized boolean useCreditForMessage(final long msgSize)
{
if(_messageCreditLimit != 0L)
{
@@ -147,10 +146,10 @@ public class Pre0_10CreditManager extends AbstractFlowCreditManager implements F
}
else
{
- if((_bytesCredit >= msg.getSize()) || (_bytesCredit == _bytesCreditLimit))
+ if((_bytesCredit >= msgSize) || (_bytesCredit == _bytesCreditLimit))
{
_messageCredit--;
- _bytesCredit -= msg.getSize();
+ _bytesCredit -= msgSize;
return true;
}
@@ -176,9 +175,9 @@ public class Pre0_10CreditManager extends AbstractFlowCreditManager implements F
}
else
{
- if((_bytesCredit >= msg.getSize()) || (_bytesCredit == _bytesCreditLimit))
+ if((_bytesCredit >= msgSize) || (_bytesCredit == _bytesCreditLimit))
{
- _bytesCredit -= msg.getSize();
+ _bytesCredit -= msgSize;
return true;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java
index 10f578551a..9623be595c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java
@@ -20,10 +20,12 @@
*/
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.message.ServerMessage;
+import org.apache.log4j.Logger;
public class WindowCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
{
+ private static final Logger LOGGER = Logger.getLogger(WindowCreditManager.class);
+
private volatile long _bytesCreditLimit;
private volatile long _messageCreditLimit;
@@ -70,31 +72,30 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl
public synchronized void restoreCredit(final long messageCredit, final long bytesCredit)
{
+ _messageUsed -= messageCredit;
+ if(_messageUsed < 0L)
+ {
+ LOGGER.error("Message credit used value was negative: "+ _messageUsed);
+ _messageUsed = 0;
+ }
+
boolean notifyIncrease = true;
+
if(_messageCreditLimit > 0L)
{
notifyIncrease = (_messageUsed != _messageCreditLimit);
- _messageUsed -= messageCredit;
-
- //TODO log warning
- if(_messageUsed < 0L)
- {
- _messageUsed = 0;
- }
}
-
+ _bytesUsed -= bytesCredit;
+ if(_bytesUsed < 0L)
+ {
+ LOGGER.error("Bytes credit used value was negative: "+ _messageUsed);
+ _bytesUsed = 0;
+ }
if(_bytesCreditLimit > 0L)
{
notifyIncrease = notifyIncrease && bytesCredit>0;
- _bytesUsed -= bytesCredit;
-
- //TODO log warning
- if(_bytesUsed < 0L)
- {
- _bytesUsed = 0;
- }
if(notifyIncrease)
{
@@ -102,10 +103,7 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl
}
}
-
-
setSuspended(!hasCredit());
-
}
@@ -116,7 +114,7 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl
&& (_messageCreditLimit < 0L || _messageCreditLimit > _messageUsed);
}
- public synchronized boolean useCreditForMessage(final ServerMessage msg)
+ public synchronized boolean useCreditForMessage(final long msgSize)
{
if(_messageCreditLimit >= 0L)
{
@@ -128,10 +126,10 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl
return true;
}
- else if(_bytesUsed + msg.getSize() <= _bytesCreditLimit)
+ else if(_bytesUsed + msgSize <= _bytesCreditLimit)
{
_messageUsed++;
- _bytesUsed += msg.getSize();
+ _bytesUsed += msgSize;
return true;
}
@@ -149,9 +147,9 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl
}
else if(_bytesCreditLimit >= 0L)
{
- if(_bytesUsed + msg.getSize() <= _bytesCreditLimit)
+ if(_bytesUsed + msgSize <= _bytesCreditLimit)
{
- _bytesUsed += msg.getSize();
+ _bytesUsed += msgSize;
return true;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
index 14ce85530e..ef9711004d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
@@ -132,7 +132,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
throws AMQException
{
- singleMessageCredit.useCreditForMessage(entry.getMessage());
+ singleMessageCredit.useCreditForMessage(entry.getMessage().getSize());
if(entry.getMessage() instanceof AMQMessage)
{
session.getProtocolOutputConverter().writeGetOk(entry, channel.getChannelId(),
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index bdf9c7119f..6603f58104 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -234,7 +234,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
public boolean wouldSuspend(QueueEntry msg)
{
- return !getCreditManager().useCreditForMessage(msg.getMessage());
+ return !getCreditManager().useCreditForMessage(msg.getMessage().getSize());
}
}
@@ -575,7 +575,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
public boolean wouldSuspend(QueueEntry msg)
{
- return !_creditManager.useCreditForMessage(msg.getMessage());//_channel.wouldSuspend(msg.getMessage());
+ return !_creditManager.useCreditForMessage(msg.getMessage().getSize());//_channel.wouldSuspend(msg.getMessage());
}
public void getSendLock()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index c5d6bc203c..9d52901fef 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -653,9 +653,9 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
_deleted.set(true);
}
- public boolean wouldSuspend(QueueEntry msg)
+ public boolean wouldSuspend(QueueEntry entry)
{
- return !_creditManager.useCreditForMessage(msg.getMessage());
+ return !_creditManager.useCreditForMessage(entry.getMessage().getSize());
}
public void getSendLock()