summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java36
1 files changed, 19 insertions, 17 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index 145d7f8b13..2f27e1405a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -123,18 +123,18 @@ public class NonTransactionalContext implements TransactionalContext
unacknowledgedMessageMap.size());
unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
{
- public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException
+ public boolean callback(final long deliveryTag, QueueEntry queueEntry) throws AMQException
{
if (debug)
{
- _log.debug("Discarding message: " + message.getMessage().getMessageId());
+ _log.debug("Discarding message: " + queueEntry.getMessageId());
}
- if(message.getMessage().isPersistent())
+ if(queueEntry.isPersistent())
{
beginTranIfNecessary();
}
- //Message has been ack so discard it. This will dequeue and decrement the reference.
- message.discard(_storeContext);
+ //Message has been ack so dequeueAndDelete it.
+ queueEntry.dequeueAndDelete(_storeContext);
return false;
}
@@ -157,10 +157,15 @@ public class NonTransactionalContext implements TransactionalContext
}
else
{
- QueueEntry msg;
- msg = unacknowledgedMessageMap.get(deliveryTag);
+ QueueEntry queueEntry;
+ queueEntry = unacknowledgedMessageMap.get(deliveryTag);
- if (msg == null)
+ if (debug)
+ {
+ _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag);
+ }
+
+ if (queueEntry == null)
{
_log.info("Single ack on delivery tag " + deliveryTag + " not known for channel:" +
_channel.getChannelId());
@@ -170,24 +175,21 @@ public class NonTransactionalContext implements TransactionalContext
if (debug)
{
- _log.debug("Discarding message: " + msg.getMessage().getMessageId());
+ _log.debug("Discarding message: " + queueEntry.getMessageId());
}
- if(msg.getMessage().isPersistent())
+ if(queueEntry.isPersistent())
{
beginTranIfNecessary();
}
- //Message has been ack so discard it. This will dequeue and decrement the reference.
- msg.discard(_storeContext);
+ //Message has been ack so dequeueAndDelete it.
+ // If the message is persistent and this is the last QueueEntry that uses it then the data will be removed
+ // from the transaciton log
+ queueEntry.dequeueAndDelete(_storeContext);
unacknowledgedMessageMap.remove(deliveryTag);
- if (debug)
- {
- _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " +
- msg.getMessage().getMessageId());
- }
}
if(_inTran)
{