summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java')
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java9
1 files changed, 7 insertions, 2 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
index cd1146ac0b..7917b7989a 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
@@ -29,6 +29,7 @@ public class MessageAcceptCompletionListener implements Method.CompletionListene
private final ConsumerTarget_0_10 _sub;
private final MessageInstance _entry;
private final ServerSession _session;
+ private long _messageSize;
private boolean _restoreCredit;
public MessageAcceptCompletionListener(ConsumerTarget_0_10 sub, ServerSession session, MessageInstance entry, boolean restoreCredit)
@@ -38,15 +39,19 @@ public class MessageAcceptCompletionListener implements Method.CompletionListene
_entry = entry;
_session = session;
_restoreCredit = restoreCredit;
+ if(restoreCredit)
+ {
+ _messageSize = entry.getMessage().getSize();
+ }
}
public void onComplete(Method method)
{
if(_restoreCredit)
{
- _sub.restoreCredit(_entry.getMessage());
+ _sub.getCreditManager().restoreCredit(1l, _messageSize);
}
- if(_entry.isAcquiredBy(_sub.getConsumer()))
+ if(_entry.isAcquiredBy(_sub.getConsumer()) && _entry.lockAcquisition())
{
_session.acknowledge(_sub, _entry);
}