summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java14
1 files changed, 9 insertions, 5 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
index c62a7880a8..55d636696d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
@@ -22,10 +22,13 @@ package org.apache.qpid.server.ack;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.TxnOp;
+import org.apache.qpid.server.queue.QueueEntry;
/**
* A TxnOp implementation for handling accumulated acks
@@ -33,7 +36,7 @@ import org.apache.qpid.server.txn.TxnOp;
public class TxAck implements TxnOp
{
private final UnacknowledgedMessageMap _map;
- private final List <UnacknowledgedMessage> _unacked = new LinkedList<UnacknowledgedMessage>();
+ private final Map<Long, QueueEntry> _unacked = new HashMap<Long,QueueEntry>();
private final List<Long> _individual = new LinkedList<Long>();
private long _deliveryTag;
private boolean _multiple;
@@ -47,6 +50,7 @@ public class TxAck implements TxnOp
{
if (!multiple)
{
+
//have acked a single message that is not part of
//the previously acked region so record
//individually
@@ -85,7 +89,7 @@ public class TxAck implements TxnOp
{
//if any of the messages in unacked are persistent the txn
//buffer must be marked as persistent:
- for (UnacknowledgedMessage msg : _unacked)
+ for (QueueEntry msg : _unacked.values())
{
if (msg.getMessage().isPersistent())
{
@@ -98,7 +102,7 @@ public class TxAck implements TxnOp
public void prepare(StoreContext storeContext) throws AMQException
{
//make persistent changes, i.e. dequeue and decrementReference
- for (UnacknowledgedMessage msg : _unacked)
+ for (QueueEntry msg : _unacked.values())
{
//Message has been ack so discard it. This will dequeue and decrement the reference.
msg.discard(storeContext);
@@ -112,7 +116,7 @@ public class TxAck implements TxnOp
//in memory counter) so if we failed in prepare for full
//txn, this op will have to compensate by fixing the count
//in memory (persistent changes will be rolled back by store)
- for (UnacknowledgedMessage msg : _unacked)
+ for (QueueEntry msg : _unacked.values())
{
msg.getMessage().takeReference();
}
@@ -121,7 +125,7 @@ public class TxAck implements TxnOp
public void commit(StoreContext storeContext)
{
//remove the unacked messages from the channels map
- _map.remove(_unacked);
+ _map.remove(_unacked);
}
public void rollback(StoreContext storeContext)