diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-11-27 18:09:33 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-11-27 18:09:33 +0000 |
commit | 933a38ba979882a96bfa5f9e7db28527c7feaf04 (patch) | |
tree | f033bd4069a941c5ed5b5299f195e3a8365ecae7 | |
parent | 2fb870a12430cebacabf97af06dc121d492865c2 (diff) | |
download | qpid-python-933a38ba979882a96bfa5f9e7db28527c7feaf04.tar.gz |
QPID-621 : Patch Supplied by Aidan Skinner. Msg Ack after msg consumer is closed.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1.1@598721 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 46 insertions, 2 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 804c846572..fd795392ee 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -106,8 +106,10 @@ import java.text.MessageFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -243,6 +245,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private Map<AMQShortString, BasicMessageConsumer> _consumers = new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>(); + + /** + * Contains a list of consumers which have been removed but which might still have + * messages to acknowledge, eg in client ack or transacted modes + */ + private CopyOnWriteArrayList<BasicMessageConsumer> _removedConsumers = new CopyOnWriteArrayList<BasicMessageConsumer>(); /** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */ private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount = @@ -647,6 +655,22 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi lastTag = next; } } + + if (_transacted) + { + // Do the above, but for consumers which have been de-registered since the + // last commit + for (int i = 0; i < _removedConsumers.size(); i++) + { + // Sends acknowledgement to server + Long next = _removedConsumers.get(i).getLastDelivered(); + if (next != null && next > lastTag) + { + lastTag = next; + } + _removedConsumers.remove(i); + } + } if (lastTag != -1) { @@ -1690,6 +1714,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _destinationConsumerCount.remove(dest); } } + + + // Consumers that are closed in a transaction must be stored + // so that messages they have received can be acknowledged on commit + if (_transacted) + { + _removedConsumers.add(consumer); + } } } @@ -2652,7 +2684,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } // Reject messages on pre-receive queue - consumer.rollback(); + consumer.rollbackPendingMessages(); // Reject messages on pre-dispatch queue rejectMessagesForConsumerTag(consumer.getConsumerTag(), true); @@ -2695,6 +2727,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } + + for (int i = 0; i < _removedConsumers.size(); i++) + { + // Sends acknowledgement to server + _removedConsumers.get(i).rollback(); + _removedConsumers.remove(i); + } setConnectionStopped(isStopped); } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 4f8a3e5557..773401d03a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -953,7 +953,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - // rollback pending messages + rollbackPendingMessages(); + } + + public void rollbackPendingMessages() + { if (_synchronousQueue.size() > 0) { if (_logger.isDebugEnabled()) diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java index 224463a446..3e2c027289 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -458,6 +458,7 @@ public class CommitRollbackTest extends TestCase assertTrue("Messasge is marked as redelivered" + result, !result.getJMSRedelivered()); _logger.info("Closing Consumer"); + _session.rollback(); _consumer.close(); _logger.info("Creating New consumer"); |