summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2007-12-20 20:08:01 +0000
committerRobert Godfrey <rgodfrey@apache.org>2007-12-20 20:08:01 +0000
commit4c55269f3725c5a29499f3615da401ad39092394 (patch)
treee1453f76b26a7ac94c572158646b2a8329a9dc19
parent36c6875b733d71a3f2cd0029e5ca25ce18e54575 (diff)
downloadqpid-python-4c55269f3725c5a29499f3615da401ad39092394.tar.gz
QPID-714 : (Patch from Aidan Skinner) Issue with competing, transactional/client-ack consumers
Ack each individual message on commit, not use multiple acks git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@606015 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java15
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java8
3 files changed, 12 insertions, 13 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index a0b79b135d..8f484383ca 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -617,7 +617,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
{
// Sends acknowledgement to server
- i.next().acknowledgeLastDelivered();
+ i.next().acknowledgeDelivered();
}
// Commits outstanding messages sent and outstanding acknowledgements.
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index ddaf0cfd93..96673d223e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -755,20 +755,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
/** Acknowledge up to last message delivered (if any). Used when commiting. */
- void acknowledgeLastDelivered()
+ void acknowledgeDelivered()
{
- if (!_receivedDeliveryTags.isEmpty())
+ while (!_receivedDeliveryTags.isEmpty())
{
- long lastDeliveryTag = _receivedDeliveryTags.poll();
-
- while (!_receivedDeliveryTags.isEmpty())
- {
- lastDeliveryTag = _receivedDeliveryTags.poll();
- }
-
- assert _receivedDeliveryTags.isEmpty();
-
- _session.acknowledgeMessage(lastDeliveryTag, true);
+ _session.acknowledgeMessage(_receivedDeliveryTags.poll(), false);
}
}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java b/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java
index 9629f87d46..b7c5134d64 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java
@@ -26,6 +26,7 @@ import junit.framework.Test;
import junit.framework.TestCase;
import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -121,6 +122,13 @@ public class VMTestCase extends TestCase
super.tearDown();
}
+ public int getMessageCount(String queueName)
+ {
+ return ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(_virtualhost.substring(1))
+ .getQueueRegistry().getQueue(new AMQShortString(queueName)).getMessageCount();
+ }
+
+
public void testDummyinVMTestCase()
{
// keep maven happy