diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 52 |
1 files changed, 43 insertions, 9 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index d17572ad77..eb776ba786 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -182,6 +182,46 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _fastAccessConsumers[i] = null; } } + + + public void acknowledgeDelivered() + { + + for(int i = 0; i<16; i++) + { + final BasicMessageConsumer c = _fastAccessConsumers[i]; + if(c != null) + { + c.acknowledgeDelivered(); + } + } + if(!_slowAccessConsumers.isEmpty()) + { + for (Iterator<BasicMessageConsumer> i = _slowAccessConsumers.values().iterator(); i.hasNext();) + { + i.next().acknowledgeDelivered(); + } + } + } + + public void acknowledge() throws JMSException + { + for(int i = 0; i<16; i++) + { + final BasicMessageConsumer c = _fastAccessConsumers[i]; + if(c != null) + { + c.acknowledge(); + } + } + if(!_slowAccessConsumers.isEmpty()) + { + for (Iterator<BasicMessageConsumer> i = _slowAccessConsumers.values().iterator(); i.hasNext();) + { + i.next().acknowledge(); + } + } + } } @@ -500,10 +540,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi throw new IllegalStateException("Session is already closed"); } - for (BasicMessageConsumer consumer : _consumers.values()) - { - consumer.acknowledge(); - } + _consumers.acknowledge(); } /** @@ -725,12 +762,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // We only need to find the highest value and ack that as commit is session level. Long lastTag = -1L; - for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) - { - i.next().acknowledgeDelivered(); - } + _consumers.acknowledgeDelivered(); - if (_transacted) + if (_transacted && !_removedConsumers.isEmpty()) { // Do the above, but for consumers which have been de-registered since the // last commit |