summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java21
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java15
2 files changed, 5 insertions, 31 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 5a8f163bee..19efe90e9e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -644,15 +644,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
{
-// i.next().acknowledgeLastDelivered();
-// }
-
- // get next acknowledgement to server
- Long next = i.next().getLastDelivered();
- if (next != null && next > lastTag)
- {
- lastTag = next;
- }
+ i.next().acknowledgeDelivered();
}
if (_transacted)
@@ -662,20 +654,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
for (int i = 0; i < _removedConsumers.size(); i++)
{
// Sends acknowledgement to server
- Long next = _removedConsumers.get(i).getLastDelivered();
- if (next != null && next > lastTag)
- {
- lastTag = next;
- }
+ _removedConsumers.get(i).acknowledgeDelivered();
_removedConsumers.remove(i);
}
}
- if (lastTag != -1)
- {
- acknowledgeMessage(lastTag, true);
- }
-
// Commits outstanding messages sent and outstanding acknowledgements.
final AMQProtocolHandler handler = getProtocolHandler();
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 3480ba6c78..450dde2c95 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -833,20 +833,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
/** Acknowledge up to last message delivered (if any). Used when commiting. */
- void acknowledgeLastDelivered()
+ void acknowledgeDelivered()
{
- if (!_receivedDeliveryTags.isEmpty())
+ while (!_receivedDeliveryTags.isEmpty())
{
- long lastDeliveryTag = _receivedDeliveryTags.poll();
-
- while (!_receivedDeliveryTags.isEmpty())
- {
- lastDeliveryTag = _receivedDeliveryTags.poll();
- }
-
- assert _receivedDeliveryTags.isEmpty();
-
- _session.acknowledgeMessage(lastDeliveryTag, true);
+ _session.acknowledgeMessage(_receivedDeliveryTags.poll(), false);
}
}