summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java52
1 files changed, 43 insertions, 9 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index d17572ad77..eb776ba786 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -182,6 +182,46 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_fastAccessConsumers[i] = null;
}
}
+
+
+ public void acknowledgeDelivered()
+ {
+
+ for(int i = 0; i<16; i++)
+ {
+ final BasicMessageConsumer c = _fastAccessConsumers[i];
+ if(c != null)
+ {
+ c.acknowledgeDelivered();
+ }
+ }
+ if(!_slowAccessConsumers.isEmpty())
+ {
+ for (Iterator<BasicMessageConsumer> i = _slowAccessConsumers.values().iterator(); i.hasNext();)
+ {
+ i.next().acknowledgeDelivered();
+ }
+ }
+ }
+
+ public void acknowledge() throws JMSException
+ {
+ for(int i = 0; i<16; i++)
+ {
+ final BasicMessageConsumer c = _fastAccessConsumers[i];
+ if(c != null)
+ {
+ c.acknowledge();
+ }
+ }
+ if(!_slowAccessConsumers.isEmpty())
+ {
+ for (Iterator<BasicMessageConsumer> i = _slowAccessConsumers.values().iterator(); i.hasNext();)
+ {
+ i.next().acknowledge();
+ }
+ }
+ }
}
@@ -500,10 +540,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
throw new IllegalStateException("Session is already closed");
}
- for (BasicMessageConsumer consumer : _consumers.values())
- {
- consumer.acknowledge();
- }
+ _consumers.acknowledge();
}
/**
@@ -725,12 +762,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// We only need to find the highest value and ack that as commit is session level.
Long lastTag = -1L;
- for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
- {
- i.next().acknowledgeDelivered();
- }
+ _consumers.acknowledgeDelivered();
- if (_transacted)
+ if (_transacted && !_removedConsumers.isEmpty())
{
// Do the above, but for consumers which have been de-registered since the
// last commit