From 5b18ceda1670947eebb26485195dd30d5f239473 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Wed, 16 Apr 2008 09:56:48 +0000 Subject: QPID-927 : Multiple acknowledgements should be coalesced into single multiple ack git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@648652 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/client/BasicMessageConsumer.java | 63 +++++++++++++++++++++- 1 file changed, 61 insertions(+), 2 deletions(-) diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index efbce6033b..4d993eafa3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -39,6 +39,10 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.SortedSet; +import java.util.ArrayList; +import java.util.Collections; +import java.util.TreeSet; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; @@ -111,6 +115,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer /** List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. */ private ConcurrentLinkedQueue _receivedDeliveryTags = new ConcurrentLinkedQueue(); + /** The last tag that was "multiple" acknowledged on this session (if transacted) */ + private long _lastAcked; + + /** set of tags which have previously been acked; but not part of the multiple ack (transacted mode only) */ + private final SortedSet _previouslyAcked = new TreeSet(); + + private final Object _commitLock = new Object(); + /** * The thread that was used to call receive(). This is important for being able to interrupt that thread if a * receive() is in progress. @@ -126,6 +138,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private final boolean _noConsume; private List _closedStack = null; + + protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow, @@ -809,9 +823,54 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer /** Acknowledge up to last message delivered (if any). Used when commiting. */ void acknowledgeDelivered() { - while (!_receivedDeliveryTags.isEmpty()) + synchronized(_commitLock) { - _session.acknowledgeMessage(_receivedDeliveryTags.poll(), false); + ArrayList tagsToAck = new ArrayList(); + + while (!_receivedDeliveryTags.isEmpty()) + { + tagsToAck.add(_receivedDeliveryTags.poll()); + } + + Collections.sort(tagsToAck); + + long prevAcked = _lastAcked; + long oldAckPoint = -1; + + while(oldAckPoint != prevAcked) + { + oldAckPoint = prevAcked; + + Iterator tagsToAckIterator = tagsToAck.iterator(); + + while(tagsToAckIterator.hasNext() && tagsToAckIterator.next() == prevAcked+1) + { + tagsToAckIterator.remove(); + prevAcked++; + } + + Iterator previousAckIterator = _previouslyAcked.iterator(); + while(previousAckIterator.hasNext() && previousAckIterator.next() == prevAcked+1) + { + previousAckIterator.remove(); + prevAcked++; + } + + } + if(prevAcked != _lastAcked) + { + _session.acknowledgeMessage(prevAcked, true); + _lastAcked = prevAcked; + } + + Iterator tagsToAckIterator = tagsToAck.iterator(); + + while(tagsToAckIterator.hasNext()) + { + Long tag = tagsToAckIterator.next(); + _session.acknowledgeMessage(tag, false); + _previouslyAcked.add(tag); + } } } -- cgit v1.2.1