diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java | 55 |
1 files changed, 25 insertions, 30 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java index 6ad704a5d8..db3a05eb52 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java @@ -22,11 +22,14 @@ package org.apache.qpid.server.ack; import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.HashMap; import java.util.ArrayList; import org.apache.qpid.AMQException; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.TxnOp; +import org.apache.qpid.server.queue.QueueEntry; /** * A TxnOp implementation for handling accumulated acks @@ -34,7 +37,7 @@ import org.apache.qpid.server.txn.TxnOp; public class TxAck implements TxnOp { private final UnacknowledgedMessageMap _map; - private final List <UnacknowledgedMessage> _unacked = new ArrayList<UnacknowledgedMessage>(); + private final Map<Long, QueueEntry> _unacked = new HashMap<Long,QueueEntry>(); private List<Long> _individual; private long _deliveryTag; private boolean _multiple; @@ -46,11 +49,12 @@ public class TxAck implements TxnOp public void update(long deliveryTag, boolean multiple) { + _unacked.clear(); if (!multiple) { if(_individual == null) { - _individual = new ArrayList<Long>(); + _individual = new ArrayList<Long>(); } //have acked a single message that is not part of //the previously acked region so record @@ -64,36 +68,29 @@ public class TxAck implements TxnOp _deliveryTag = deliveryTag; _multiple = true; } - _unacked.clear(); } public void consolidate() { if(_unacked.isEmpty()) { - consolidate(_unacked); - } - - } - - private void consolidate(List<UnacknowledgedMessage> unacked) - { - //lookup all the unacked messages that have been acked in this transaction - if (_multiple) - { - //get all the unacked messages for the accumulated - //multiple acks - _map.collect(_deliveryTag, true, unacked); - } - //get any unacked messages for individual acks outside the - //range covered by multiple acks - if(_individual != null) - { - for (Long tag : _individual) + //lookup all the unacked messages that have been acked in this transaction + if (_multiple) { - if(_deliveryTag < tag) + //get all the unacked messages for the accumulated + //multiple acks + _map.collect(_deliveryTag, true, _unacked); + } + if(_individual != null) + { + //get any unacked messages for individual acks outside the + //range covered by multiple acks + for (long tag : _individual) { - _map.collect(tag, false, unacked); + if(_deliveryTag < tag) + { + _map.collect(tag, false, _unacked); + } } } } @@ -101,12 +98,10 @@ public class TxAck implements TxnOp public boolean checkPersistent() throws AMQException { - - consolidate(); //if any of the messages in unacked are persistent the txn //buffer must be marked as persistent: - for (UnacknowledgedMessage msg : _unacked) + for (QueueEntry msg : _unacked.values()) { if (msg.getMessage().isPersistent()) { @@ -119,7 +114,7 @@ public class TxAck implements TxnOp public void prepare(StoreContext storeContext) throws AMQException { //make persistent changes, i.e. dequeue and decrementReference - for (UnacknowledgedMessage msg : _unacked) + for (QueueEntry msg : _unacked.values()) { //Message has been ack so discard it. This will dequeue and decrement the reference. msg.discard(storeContext); @@ -133,7 +128,7 @@ public class TxAck implements TxnOp //in memory counter) so if we failed in prepare for full //txn, this op will have to compensate by fixing the count //in memory (persistent changes will be rolled back by store) - for (UnacknowledgedMessage msg : _unacked) + for (QueueEntry msg : _unacked.values()) { msg.getMessage().takeReference(); } @@ -142,7 +137,7 @@ public class TxAck implements TxnOp public void commit(StoreContext storeContext) { //remove the unacked messages from the channels map - _map.remove(_unacked); + _map.remove(_unacked); } public void rollback(StoreContext storeContext) |