diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2007-12-20 20:08:01 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2007-12-20 20:08:01 +0000 |
commit | 20caf04ca8d28b0ff3043bccda9a274f0b3c23ad (patch) | |
tree | c499e2f66225d6fae22904d3e0bf49dccc58e8b9 | |
parent | 7029bc248c48c93e243e38256d7f49a91ba54301 (diff) | |
download | qpid-python-20caf04ca8d28b0ff3043bccda9a274f0b3c23ad.tar.gz |
QPID-714 : (Patch from Aidan Skinner) Issue with competing, transactional/client-ack consumers
Ack each individual message on commit, not use multiple acks
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1.1@606015 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 21 | ||||
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java | 15 |
2 files changed, 5 insertions, 31 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 5a8f163bee..19efe90e9e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -644,15 +644,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) { -// i.next().acknowledgeLastDelivered(); -// } - - // get next acknowledgement to server - Long next = i.next().getLastDelivered(); - if (next != null && next > lastTag) - { - lastTag = next; - } + i.next().acknowledgeDelivered(); } if (_transacted) @@ -662,20 +654,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi for (int i = 0; i < _removedConsumers.size(); i++) { // Sends acknowledgement to server - Long next = _removedConsumers.get(i).getLastDelivered(); - if (next != null && next > lastTag) - { - lastTag = next; - } + _removedConsumers.get(i).acknowledgeDelivered(); _removedConsumers.remove(i); } } - if (lastTag != -1) - { - acknowledgeMessage(lastTag, true); - } - // Commits outstanding messages sent and outstanding acknowledgements. final AMQProtocolHandler handler = getProtocolHandler(); diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 3480ba6c78..450dde2c95 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -833,20 +833,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } /** Acknowledge up to last message delivered (if any). Used when commiting. */ - void acknowledgeLastDelivered() + void acknowledgeDelivered() { - if (!_receivedDeliveryTags.isEmpty()) + while (!_receivedDeliveryTags.isEmpty()) { - long lastDeliveryTag = _receivedDeliveryTags.poll(); - - while (!_receivedDeliveryTags.isEmpty()) - { - lastDeliveryTag = _receivedDeliveryTags.poll(); - } - - assert _receivedDeliveryTags.isEmpty(); - - _session.acknowledgeMessage(lastDeliveryTag, true); + _session.acknowledgeMessage(_receivedDeliveryTags.poll(), false); } } |