summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2008-04-16 09:56:48 +0000
committerRobert Godfrey <rgodfrey@apache.org>2008-04-16 09:56:48 +0000
commit5b18ceda1670947eebb26485195dd30d5f239473 (patch)
tree5263e2ffa2edf5705a5570e2fa3ddc06db6d8553
parent56e72efee5eeefa3d73df1f9fbd77058d017a8ee (diff)
downloadqpid-python-5b18ceda1670947eebb26485195dd30d5f239473.tar.gz
QPID-927 : Multiple acknowledgements should be coalesced into single multiple ack
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@648652 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java63
1 files changed, 61 insertions, 2 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index efbce6033b..4d993eafa3 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -39,6 +39,10 @@ import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.SortedSet;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.TreeSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
@@ -111,6 +115,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
/** List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. */
private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>();
+ /** The last tag that was "multiple" acknowledged on this session (if transacted) */
+ private long _lastAcked;
+
+ /** set of tags which have previously been acked; but not part of the multiple ack (transacted mode only) */
+ private final SortedSet<Long> _previouslyAcked = new TreeSet<Long>();
+
+ private final Object _commitLock = new Object();
+
/**
* The thread that was used to call receive(). This is important for being able to interrupt that thread if a
* receive() is in progress.
@@ -126,6 +138,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private final boolean _noConsume;
private List<StackTraceElement> _closedStack = null;
+
+
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
@@ -809,9 +823,54 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
/** Acknowledge up to last message delivered (if any). Used when commiting. */
void acknowledgeDelivered()
{
- while (!_receivedDeliveryTags.isEmpty())
+ synchronized(_commitLock)
{
- _session.acknowledgeMessage(_receivedDeliveryTags.poll(), false);
+ ArrayList<Long> tagsToAck = new ArrayList<Long>();
+
+ while (!_receivedDeliveryTags.isEmpty())
+ {
+ tagsToAck.add(_receivedDeliveryTags.poll());
+ }
+
+ Collections.sort(tagsToAck);
+
+ long prevAcked = _lastAcked;
+ long oldAckPoint = -1;
+
+ while(oldAckPoint != prevAcked)
+ {
+ oldAckPoint = prevAcked;
+
+ Iterator<Long> tagsToAckIterator = tagsToAck.iterator();
+
+ while(tagsToAckIterator.hasNext() && tagsToAckIterator.next() == prevAcked+1)
+ {
+ tagsToAckIterator.remove();
+ prevAcked++;
+ }
+
+ Iterator<Long> previousAckIterator = _previouslyAcked.iterator();
+ while(previousAckIterator.hasNext() && previousAckIterator.next() == prevAcked+1)
+ {
+ previousAckIterator.remove();
+ prevAcked++;
+ }
+
+ }
+ if(prevAcked != _lastAcked)
+ {
+ _session.acknowledgeMessage(prevAcked, true);
+ _lastAcked = prevAcked;
+ }
+
+ Iterator<Long> tagsToAckIterator = tagsToAck.iterator();
+
+ while(tagsToAckIterator.hasNext())
+ {
+ Long tag = tagsToAckIterator.next();
+ _session.acknowledgeMessage(tag, false);
+ _previouslyAcked.add(tag);
+ }
}
}