summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java26
1 files changed, 22 insertions, 4 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index 8c3692a98d..7321854034 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -31,6 +31,7 @@ import org.apache.qpid.server.store.MessageStore;
import java.util.LinkedList;
import java.util.List;
+import java.util.Set;
/**
* @author Apache Software Foundation
@@ -49,6 +50,8 @@ public class NonTransactionalContext implements TransactionalContext
*/
private final List<RequiredDeliveryException> _returnMessages;
+ private Set<Long> _browsedAcks;
+
private final MessageStore _messageStore;
/**
@@ -57,11 +60,12 @@ public class NonTransactionalContext implements TransactionalContext
private boolean _inTran;
public NonTransactionalContext(MessageStore messageStore, AMQChannel channel,
- List<RequiredDeliveryException> returnMessages)
+ List<RequiredDeliveryException> returnMessages, Set<Long> browsedAcks)
{
_channel = channel;
_returnMessages = returnMessages;
_messageStore = messageStore;
+ _browsedAcks = browsedAcks;
}
public void beginTranIfNecessary() throws AMQException
@@ -111,12 +115,19 @@ public class NonTransactionalContext implements TransactionalContext
//Spec 2.1.6.11 ... If the multiple field is 1, and the delivery tag is zero,
// tells the server to acknowledge all outstanding mesages.
_log.info("Multiple ack on delivery tag 0. ACKing all messages. Current count:" +
- unacknowledgedMessageMap.size());
+ unacknowledgedMessageMap.size());
unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
{
public boolean callback(UnacknowledgedMessage message) throws AMQException
{
- message.discard();
+ if (!_browsedAcks.contains(deliveryTag))
+ {
+ message.discard();
+ }
+ else
+ {
+ _browsedAcks.remove(deliveryTag);
+ }
return false;
}
@@ -137,7 +148,14 @@ public class NonTransactionalContext implements TransactionalContext
unacknowledgedMessageMap.drainTo(acked, deliveryTag);
for (UnacknowledgedMessage msg : acked)
{
- msg.discard();
+ if (!_browsedAcks.contains(deliveryTag))
+ {
+ msg.discard();
+ }
+ else
+ {
+ _browsedAcks.remove(deliveryTag);
+ }
}
}
}