diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java | 14 |
1 files changed, 9 insertions, 5 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java index c62a7880a8..55d636696d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java @@ -22,10 +22,13 @@ package org.apache.qpid.server.ack; import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.HashMap; import org.apache.qpid.AMQException; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.TxnOp; +import org.apache.qpid.server.queue.QueueEntry; /** * A TxnOp implementation for handling accumulated acks @@ -33,7 +36,7 @@ import org.apache.qpid.server.txn.TxnOp; public class TxAck implements TxnOp { private final UnacknowledgedMessageMap _map; - private final List <UnacknowledgedMessage> _unacked = new LinkedList<UnacknowledgedMessage>(); + private final Map<Long, QueueEntry> _unacked = new HashMap<Long,QueueEntry>(); private final List<Long> _individual = new LinkedList<Long>(); private long _deliveryTag; private boolean _multiple; @@ -47,6 +50,7 @@ public class TxAck implements TxnOp { if (!multiple) { + //have acked a single message that is not part of //the previously acked region so record //individually @@ -85,7 +89,7 @@ public class TxAck implements TxnOp { //if any of the messages in unacked are persistent the txn //buffer must be marked as persistent: - for (UnacknowledgedMessage msg : _unacked) + for (QueueEntry msg : _unacked.values()) { if (msg.getMessage().isPersistent()) { @@ -98,7 +102,7 @@ public class TxAck implements TxnOp public void prepare(StoreContext storeContext) throws AMQException { //make persistent changes, i.e. dequeue and decrementReference - for (UnacknowledgedMessage msg : _unacked) + for (QueueEntry msg : _unacked.values()) { //Message has been ack so discard it. This will dequeue and decrement the reference. msg.discard(storeContext); @@ -112,7 +116,7 @@ public class TxAck implements TxnOp //in memory counter) so if we failed in prepare for full //txn, this op will have to compensate by fixing the count //in memory (persistent changes will be rolled back by store) - for (UnacknowledgedMessage msg : _unacked) + for (QueueEntry msg : _unacked.values()) { msg.getMessage().takeReference(); } @@ -121,7 +125,7 @@ public class TxAck implements TxnOp public void commit(StoreContext storeContext) { //remove the unacked messages from the channels map - _map.remove(_unacked); + _map.remove(_unacked); } public void rollback(StoreContext storeContext) |