diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java | 26 |
1 files changed, 22 insertions, 4 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index 8c3692a98d..7321854034 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -31,6 +31,7 @@ import org.apache.qpid.server.store.MessageStore; import java.util.LinkedList; import java.util.List; +import java.util.Set; /** * @author Apache Software Foundation @@ -49,6 +50,8 @@ public class NonTransactionalContext implements TransactionalContext */ private final List<RequiredDeliveryException> _returnMessages; + private Set<Long> _browsedAcks; + private final MessageStore _messageStore; /** @@ -57,11 +60,12 @@ public class NonTransactionalContext implements TransactionalContext private boolean _inTran; public NonTransactionalContext(MessageStore messageStore, AMQChannel channel, - List<RequiredDeliveryException> returnMessages) + List<RequiredDeliveryException> returnMessages, Set<Long> browsedAcks) { _channel = channel; _returnMessages = returnMessages; _messageStore = messageStore; + _browsedAcks = browsedAcks; } public void beginTranIfNecessary() throws AMQException @@ -111,12 +115,19 @@ public class NonTransactionalContext implements TransactionalContext //Spec 2.1.6.11 ... If the multiple field is 1, and the delivery tag is zero, // tells the server to acknowledge all outstanding mesages. _log.info("Multiple ack on delivery tag 0. ACKing all messages. Current count:" + - unacknowledgedMessageMap.size()); + unacknowledgedMessageMap.size()); unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() { public boolean callback(UnacknowledgedMessage message) throws AMQException { - message.discard(); + if (!_browsedAcks.contains(deliveryTag)) + { + message.discard(); + } + else + { + _browsedAcks.remove(deliveryTag); + } return false; } @@ -137,7 +148,14 @@ public class NonTransactionalContext implements TransactionalContext unacknowledgedMessageMap.drainTo(acked, deliveryTag); for (UnacknowledgedMessage msg : acked) { - msg.discard(); + if (!_browsedAcks.contains(deliveryTag)) + { + msg.discard(); + } + else + { + _browsedAcks.remove(deliveryTag); + } } } } |