summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java63
1 files changed, 61 insertions, 2 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index efbce6033b..4d993eafa3 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -39,6 +39,10 @@ import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.SortedSet;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.TreeSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
@@ -111,6 +115,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
/** List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. */
private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>();
+ /** The last tag that was "multiple" acknowledged on this session (if transacted) */
+ private long _lastAcked;
+
+ /** set of tags which have previously been acked; but not part of the multiple ack (transacted mode only) */
+ private final SortedSet<Long> _previouslyAcked = new TreeSet<Long>();
+
+ private final Object _commitLock = new Object();
+
/**
* The thread that was used to call receive(). This is important for being able to interrupt that thread if a
* receive() is in progress.
@@ -126,6 +138,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private final boolean _noConsume;
private List<StackTraceElement> _closedStack = null;
+
+
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
@@ -809,9 +823,54 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
/** Acknowledge up to last message delivered (if any). Used when commiting. */
void acknowledgeDelivered()
{
- while (!_receivedDeliveryTags.isEmpty())
+ synchronized(_commitLock)
{
- _session.acknowledgeMessage(_receivedDeliveryTags.poll(), false);
+ ArrayList<Long> tagsToAck = new ArrayList<Long>();
+
+ while (!_receivedDeliveryTags.isEmpty())
+ {
+ tagsToAck.add(_receivedDeliveryTags.poll());
+ }
+
+ Collections.sort(tagsToAck);
+
+ long prevAcked = _lastAcked;
+ long oldAckPoint = -1;
+
+ while(oldAckPoint != prevAcked)
+ {
+ oldAckPoint = prevAcked;
+
+ Iterator<Long> tagsToAckIterator = tagsToAck.iterator();
+
+ while(tagsToAckIterator.hasNext() && tagsToAckIterator.next() == prevAcked+1)
+ {
+ tagsToAckIterator.remove();
+ prevAcked++;
+ }
+
+ Iterator<Long> previousAckIterator = _previouslyAcked.iterator();
+ while(previousAckIterator.hasNext() && previousAckIterator.next() == prevAcked+1)
+ {
+ previousAckIterator.remove();
+ prevAcked++;
+ }
+
+ }
+ if(prevAcked != _lastAcked)
+ {
+ _session.acknowledgeMessage(prevAcked, true);
+ _lastAcked = prevAcked;
+ }
+
+ Iterator<Long> tagsToAckIterator = tagsToAck.iterator();
+
+ while(tagsToAckIterator.hasNext())
+ {
+ Long tag = tagsToAckIterator.next();
+ _session.acknowledgeMessage(tag, false);
+ _previouslyAcked.add(tag);
+ }
}
}