summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org')
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java18
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java8
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java9
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java2
4 files changed, 26 insertions, 11 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index d73d019000..7ab3fbb1f5 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -534,15 +534,25 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
return _stopped.get();
}
- public void acknowledge(MessageInstance entry)
+ public boolean deleteAcquired(MessageInstance entry)
{
- // TODO Fix Store Context / cleanup
if(entry.isAcquiredBy(getConsumer()))
{
- _unacknowledgedBytes.addAndGet(-entry.getMessage().getSize());
- _unacknowledgedCount.decrementAndGet();
+ acquisitionRemoved(entry);
entry.delete();
+ return true;
}
+ else
+ {
+ return false;
+ }
+ }
+
+ @Override
+ public void acquisitionRemoved(final MessageInstance entry)
+ {
+ _unacknowledgedBytes.addAndGet(-entry.getMessage().getSize());
+ _unacknowledgedCount.decrementAndGet();
}
public void flush()
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
index 4420709a91..94f04bbae3 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
@@ -41,13 +41,13 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public void onAccept()
{
- if(_target != null && _entry.isAcquiredBy(_target.getConsumer()))
+ if(_target != null && _entry.isAcquiredBy(_target.getConsumer()) && _entry.lockAcquisition())
{
_target.getSessionModel().acknowledge(_target, _entry);
}
else
{
- _logger.warn("MessageAccept received for message which has not been acquired (likely client error)");
+ _logger.info("MessageAccept received for message which is not been acquired - message may have expired or been removed");
}
}
@@ -60,7 +60,7 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
}
else
{
- _logger.warn("MessageRelease received for message which has not been acquired (likely client error)");
+ _logger.warn("MessageRelease received for message which has not been acquired - message may have expired or been removed");
}
}
@@ -72,7 +72,7 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
}
else
{
- _logger.warn("MessageReject received for message which has not been acquired (likely client error)");
+ _logger.warn("MessageReject received for message which has not been acquired - message may have expired or been removed");
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
index cd1146ac0b..7917b7989a 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
@@ -29,6 +29,7 @@ public class MessageAcceptCompletionListener implements Method.CompletionListene
private final ConsumerTarget_0_10 _sub;
private final MessageInstance _entry;
private final ServerSession _session;
+ private long _messageSize;
private boolean _restoreCredit;
public MessageAcceptCompletionListener(ConsumerTarget_0_10 sub, ServerSession session, MessageInstance entry, boolean restoreCredit)
@@ -38,15 +39,19 @@ public class MessageAcceptCompletionListener implements Method.CompletionListene
_entry = entry;
_session = session;
_restoreCredit = restoreCredit;
+ if(restoreCredit)
+ {
+ _messageSize = entry.getMessage().getSize();
+ }
}
public void onComplete(Method method)
{
if(_restoreCredit)
{
- _sub.restoreCredit(_entry.getMessage());
+ _sub.getCreditManager().restoreCredit(1l, _messageSize);
}
- if(_entry.isAcquiredBy(_sub.getConsumer()))
+ if(_entry.isAcquiredBy(_sub.getConsumer()) && _entry.lockAcquisition())
{
_session.acknowledge(_sub, _entry);
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 3fe1515b18..b1c22fe823 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -460,7 +460,7 @@ public class ServerSession extends Session
public void postCommit()
{
- sub.acknowledge(entry);
+ sub.deleteAcquired(entry);
}
public void onRollback()