summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2007-12-20 20:08:01 +0000
committerRobert Godfrey <rgodfrey@apache.org>2007-12-20 20:08:01 +0000
commit20caf04ca8d28b0ff3043bccda9a274f0b3c23ad (patch)
treec499e2f66225d6fae22904d3e0bf49dccc58e8b9
parent7029bc248c48c93e243e38256d7f49a91ba54301 (diff)
downloadqpid-python-20caf04ca8d28b0ff3043bccda9a274f0b3c23ad.tar.gz
QPID-714 : (Patch from Aidan Skinner) Issue with competing, transactional/client-ack consumers
Ack each individual message on commit, not use multiple acks git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1.1@606015 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java21
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java15
2 files changed, 5 insertions, 31 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 5a8f163bee..19efe90e9e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -644,15 +644,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
{
-// i.next().acknowledgeLastDelivered();
-// }
-
- // get next acknowledgement to server
- Long next = i.next().getLastDelivered();
- if (next != null && next > lastTag)
- {
- lastTag = next;
- }
+ i.next().acknowledgeDelivered();
}
if (_transacted)
@@ -662,20 +654,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
for (int i = 0; i < _removedConsumers.size(); i++)
{
// Sends acknowledgement to server
- Long next = _removedConsumers.get(i).getLastDelivered();
- if (next != null && next > lastTag)
- {
- lastTag = next;
- }
+ _removedConsumers.get(i).acknowledgeDelivered();
_removedConsumers.remove(i);
}
}
- if (lastTag != -1)
- {
- acknowledgeMessage(lastTag, true);
- }
-
// Commits outstanding messages sent and outstanding acknowledgements.
final AMQProtocolHandler handler = getProtocolHandler();
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 3480ba6c78..450dde2c95 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -833,20 +833,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
/** Acknowledge up to last message delivered (if any). Used when commiting. */
- void acknowledgeLastDelivered()
+ void acknowledgeDelivered()
{
- if (!_receivedDeliveryTags.isEmpty())
+ while (!_receivedDeliveryTags.isEmpty())
{
- long lastDeliveryTag = _receivedDeliveryTags.poll();
-
- while (!_receivedDeliveryTags.isEmpty())
- {
- lastDeliveryTag = _receivedDeliveryTags.poll();
- }
-
- assert _receivedDeliveryTags.isEmpty();
-
- _session.acknowledgeMessage(lastDeliveryTag, true);
+ _session.acknowledgeMessage(_receivedDeliveryTags.poll(), false);
}
}