From 20caf04ca8d28b0ff3043bccda9a274f0b3c23ad Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Thu, 20 Dec 2007 20:08:01 +0000 Subject: QPID-714 : (Patch from Aidan Skinner) Issue with competing, transactional/client-ack consumers Ack each individual message on commit, not use multiple acks git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1.1@606015 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 21 ++------------------- .../apache/qpid/client/BasicMessageConsumer.java | 15 +++------------ 2 files changed, 5 insertions(+), 31 deletions(-) diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 5a8f163bee..19efe90e9e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -644,15 +644,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi for (Iterator i = _consumers.values().iterator(); i.hasNext();) { -// i.next().acknowledgeLastDelivered(); -// } - - // get next acknowledgement to server - Long next = i.next().getLastDelivered(); - if (next != null && next > lastTag) - { - lastTag = next; - } + i.next().acknowledgeDelivered(); } if (_transacted) @@ -662,20 +654,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi for (int i = 0; i < _removedConsumers.size(); i++) { // Sends acknowledgement to server - Long next = _removedConsumers.get(i).getLastDelivered(); - if (next != null && next > lastTag) - { - lastTag = next; - } + _removedConsumers.get(i).acknowledgeDelivered(); _removedConsumers.remove(i); } } - if (lastTag != -1) - { - acknowledgeMessage(lastTag, true); - } - // Commits outstanding messages sent and outstanding acknowledgements. final AMQProtocolHandler handler = getProtocolHandler(); diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 3480ba6c78..450dde2c95 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -833,20 +833,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } /** Acknowledge up to last message delivered (if any). Used when commiting. */ - void acknowledgeLastDelivered() + void acknowledgeDelivered() { - if (!_receivedDeliveryTags.isEmpty()) + while (!_receivedDeliveryTags.isEmpty()) { - long lastDeliveryTag = _receivedDeliveryTags.poll(); - - while (!_receivedDeliveryTags.isEmpty()) - { - lastDeliveryTag = _receivedDeliveryTags.poll(); - } - - assert _receivedDeliveryTags.isEmpty(); - - _session.acknowledgeMessage(lastDeliveryTag, true); + _session.acknowledgeMessage(_receivedDeliveryTags.poll(), false); } } -- cgit v1.2.1