diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2008-04-16 09:56:48 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2008-04-16 09:56:48 +0000 |
commit | 5b18ceda1670947eebb26485195dd30d5f239473 (patch) | |
tree | 5263e2ffa2edf5705a5570e2fa3ddc06db6d8553 | |
parent | 56e72efee5eeefa3d73df1f9fbd77058d017a8ee (diff) | |
download | qpid-python-5b18ceda1670947eebb26485195dd30d5f239473.tar.gz |
QPID-927 : Multiple acknowledgements should be coalesced into single multiple ack
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@648652 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java | 63 |
1 files changed, 61 insertions, 2 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index efbce6033b..4d993eafa3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -39,6 +39,10 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.SortedSet; +import java.util.ArrayList; +import java.util.Collections; +import java.util.TreeSet; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; @@ -111,6 +115,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer /** List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. */ private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>(); + /** The last tag that was "multiple" acknowledged on this session (if transacted) */ + private long _lastAcked; + + /** set of tags which have previously been acked; but not part of the multiple ack (transacted mode only) */ + private final SortedSet<Long> _previouslyAcked = new TreeSet<Long>(); + + private final Object _commitLock = new Object(); + /** * The thread that was used to call receive(). This is important for being able to interrupt that thread if a * receive() is in progress. @@ -126,6 +138,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private final boolean _noConsume; private List<StackTraceElement> _closedStack = null; + + protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow, @@ -809,9 +823,54 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer /** Acknowledge up to last message delivered (if any). Used when commiting. */ void acknowledgeDelivered() { - while (!_receivedDeliveryTags.isEmpty()) + synchronized(_commitLock) { - _session.acknowledgeMessage(_receivedDeliveryTags.poll(), false); + ArrayList<Long> tagsToAck = new ArrayList<Long>(); + + while (!_receivedDeliveryTags.isEmpty()) + { + tagsToAck.add(_receivedDeliveryTags.poll()); + } + + Collections.sort(tagsToAck); + + long prevAcked = _lastAcked; + long oldAckPoint = -1; + + while(oldAckPoint != prevAcked) + { + oldAckPoint = prevAcked; + + Iterator<Long> tagsToAckIterator = tagsToAck.iterator(); + + while(tagsToAckIterator.hasNext() && tagsToAckIterator.next() == prevAcked+1) + { + tagsToAckIterator.remove(); + prevAcked++; + } + + Iterator<Long> previousAckIterator = _previouslyAcked.iterator(); + while(previousAckIterator.hasNext() && previousAckIterator.next() == prevAcked+1) + { + previousAckIterator.remove(); + prevAcked++; + } + + } + if(prevAcked != _lastAcked) + { + _session.acknowledgeMessage(prevAcked, true); + _lastAcked = prevAcked; + } + + Iterator<Long> tagsToAckIterator = tagsToAck.iterator(); + + while(tagsToAckIterator.hasNext()) + { + Long tag = tagsToAckIterator.next(); + _session.acknowledgeMessage(tag, false); + _previouslyAcked.add(tag); + } } } |