diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 55 |
1 files changed, 10 insertions, 45 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index d67443a5b7..ec3e7838f5 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -189,51 +189,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess _fastAccessConsumers[i] = null; } } - - - public void acknowledgeDelivered() - { - - for(int i = 0; i<16; i++) - { - final BasicMessageConsumer c = _fastAccessConsumers[i]; - if(c != null) - { - c.acknowledgeDelivered(); - } - } - if(!_slowAccessConsumers.isEmpty()) - { - for (Iterator<BasicMessageConsumer> i = _slowAccessConsumers.values().iterator(); i.hasNext();) - { - i.next().acknowledgeDelivered(); - } - } - } - - public void acknowledge() throws JMSException - { - for(int i = 0; i<16; i++) - { - final BasicMessageConsumer c = _fastAccessConsumers[i]; - if(c != null) - { - c.acknowledge(); - } - } - if(!_slowAccessConsumers.isEmpty()) - { - for (Iterator<BasicMessageConsumer> i = _slowAccessConsumers.values().iterator(); i.hasNext();) - { - i.next().acknowledge(); - } - } - } } - - - /** Used for debugging. */ private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); @@ -555,14 +512,22 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * * @throws IllegalStateException If the session is closed. */ - public void acknowledge() throws JMSException + public void acknowledge() throws IllegalStateException { if (isClosed()) { throw new IllegalStateException("Session is already closed"); } - _consumers.acknowledge(); + while (true) + { + Long tag = _unacknowledgedMessageTags.poll(); + if (tag == null) + { + break; + } + acknowledgeMessage(tag, false); + } } /** |