diff options
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org')
4 files changed, 26 insertions, 11 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index d73d019000..7ab3fbb1f5 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -534,15 +534,25 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC return _stopped.get(); } - public void acknowledge(MessageInstance entry) + public boolean deleteAcquired(MessageInstance entry) { - // TODO Fix Store Context / cleanup if(entry.isAcquiredBy(getConsumer())) { - _unacknowledgedBytes.addAndGet(-entry.getMessage().getSize()); - _unacknowledgedCount.decrementAndGet(); + acquisitionRemoved(entry); entry.delete(); + return true; } + else + { + return false; + } + } + + @Override + public void acquisitionRemoved(final MessageInstance entry) + { + _unacknowledgedBytes.addAndGet(-entry.getMessage().getSize()); + _unacknowledgedCount.decrementAndGet(); } public void flush() diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java index 4420709a91..94f04bbae3 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java @@ -41,13 +41,13 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi public void onAccept() { - if(_target != null && _entry.isAcquiredBy(_target.getConsumer())) + if(_target != null && _entry.isAcquiredBy(_target.getConsumer()) && _entry.lockAcquisition()) { _target.getSessionModel().acknowledge(_target, _entry); } else { - _logger.warn("MessageAccept received for message which has not been acquired (likely client error)"); + _logger.info("MessageAccept received for message which is not been acquired - message may have expired or been removed"); } } @@ -60,7 +60,7 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi } else { - _logger.warn("MessageRelease received for message which has not been acquired (likely client error)"); + _logger.warn("MessageRelease received for message which has not been acquired - message may have expired or been removed"); } } @@ -72,7 +72,7 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi } else { - _logger.warn("MessageReject received for message which has not been acquired (likely client error)"); + _logger.warn("MessageReject received for message which has not been acquired - message may have expired or been removed"); } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java index cd1146ac0b..7917b7989a 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java @@ -29,6 +29,7 @@ public class MessageAcceptCompletionListener implements Method.CompletionListene private final ConsumerTarget_0_10 _sub; private final MessageInstance _entry; private final ServerSession _session; + private long _messageSize; private boolean _restoreCredit; public MessageAcceptCompletionListener(ConsumerTarget_0_10 sub, ServerSession session, MessageInstance entry, boolean restoreCredit) @@ -38,15 +39,19 @@ public class MessageAcceptCompletionListener implements Method.CompletionListene _entry = entry; _session = session; _restoreCredit = restoreCredit; + if(restoreCredit) + { + _messageSize = entry.getMessage().getSize(); + } } public void onComplete(Method method) { if(_restoreCredit) { - _sub.restoreCredit(_entry.getMessage()); + _sub.getCreditManager().restoreCredit(1l, _messageSize); } - if(_entry.isAcquiredBy(_sub.getConsumer())) + if(_entry.isAcquiredBy(_sub.getConsumer()) && _entry.lockAcquisition()) { _session.acknowledge(_sub, _entry); } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 3fe1515b18..b1c22fe823 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -460,7 +460,7 @@ public class ServerSession extends Session public void postCommit() { - sub.acknowledge(entry); + sub.deleteAcquired(entry); } public void onRollback() |