diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2007-12-20 20:08:01 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2007-12-20 20:08:01 +0000 |
commit | 4c55269f3725c5a29499f3615da401ad39092394 (patch) | |
tree | e1453f76b26a7ac94c572158646b2a8329a9dc19 | |
parent | 36c6875b733d71a3f2cd0029e5ca25ce18e54575 (diff) | |
download | qpid-python-4c55269f3725c5a29499f3615da401ad39092394.tar.gz |
QPID-714 : (Patch from Aidan Skinner) Issue with competing, transactional/client-ack consumers
Ack each individual message on commit, not use multiple acks
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@606015 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 12 insertions, 13 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index a0b79b135d..8f484383ca 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -617,7 +617,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) { // Sends acknowledgement to server - i.next().acknowledgeLastDelivered(); + i.next().acknowledgeDelivered(); } // Commits outstanding messages sent and outstanding acknowledgements. diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index ddaf0cfd93..96673d223e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -755,20 +755,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } /** Acknowledge up to last message delivered (if any). Used when commiting. */ - void acknowledgeLastDelivered() + void acknowledgeDelivered() { - if (!_receivedDeliveryTags.isEmpty()) + while (!_receivedDeliveryTags.isEmpty()) { - long lastDeliveryTag = _receivedDeliveryTags.poll(); - - while (!_receivedDeliveryTags.isEmpty()) - { - lastDeliveryTag = _receivedDeliveryTags.poll(); - } - - assert _receivedDeliveryTags.isEmpty(); - - _session.acknowledgeMessage(lastDeliveryTag, true); + _session.acknowledgeMessage(_receivedDeliveryTags.poll(), false); } } diff --git a/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java b/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java index 9629f87d46..b7c5134d64 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java +++ b/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java @@ -26,6 +26,7 @@ import junit.framework.Test; import junit.framework.TestCase; import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; import org.apache.qpid.server.registry.ApplicationRegistry; @@ -121,6 +122,13 @@ public class VMTestCase extends TestCase super.tearDown(); } + public int getMessageCount(String queueName) + { + return ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(_virtualhost.substring(1)) + .getQueueRegistry().getQueue(new AMQShortString(queueName)).getMessageCount(); + } + + public void testDummyinVMTestCase() { // keep maven happy |