diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 41 |
1 files changed, 40 insertions, 1 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 804c846572..fd795392ee 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -106,8 +106,10 @@ import java.text.MessageFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -243,6 +245,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private Map<AMQShortString, BasicMessageConsumer> _consumers = new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>(); + + /** + * Contains a list of consumers which have been removed but which might still have + * messages to acknowledge, eg in client ack or transacted modes + */ + private CopyOnWriteArrayList<BasicMessageConsumer> _removedConsumers = new CopyOnWriteArrayList<BasicMessageConsumer>(); /** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */ private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount = @@ -647,6 +655,22 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi lastTag = next; } } + + if (_transacted) + { + // Do the above, but for consumers which have been de-registered since the + // last commit + for (int i = 0; i < _removedConsumers.size(); i++) + { + // Sends acknowledgement to server + Long next = _removedConsumers.get(i).getLastDelivered(); + if (next != null && next > lastTag) + { + lastTag = next; + } + _removedConsumers.remove(i); + } + } if (lastTag != -1) { @@ -1690,6 +1714,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _destinationConsumerCount.remove(dest); } } + + + // Consumers that are closed in a transaction must be stored + // so that messages they have received can be acknowledged on commit + if (_transacted) + { + _removedConsumers.add(consumer); + } } } @@ -2652,7 +2684,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } // Reject messages on pre-receive queue - consumer.rollback(); + consumer.rollbackPendingMessages(); // Reject messages on pre-dispatch queue rejectMessagesForConsumerTag(consumer.getConsumerTag(), true); @@ -2695,6 +2727,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } + + for (int i = 0; i < _removedConsumers.size(); i++) + { + // Sends acknowledgement to server + _removedConsumers.get(i).rollback(); + _removedConsumers.remove(i); + } setConnectionStopped(isStopped); } |