From 385956302c8f65a0b1908efdbb8bc5df9dbffcc3 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Tue, 24 Jan 2012 23:26:46 +0000 Subject: QPID-3604 Once message stop is issued for each subscriber, the client now drains the internal queues of each subscriber. It also drains the dispatch queue. These messages are then released without marking them as redelivered. Messages that were given to the application but were not acked are also released, but are marked as redelivered. All messages received upto that point are marked as completed. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1235550 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 54 +++++++++++++++++++++- .../org/apache/qpid/client/AMQSession_0_10.java | 40 ++++++++++++++++ 2 files changed, 93 insertions(+), 1 deletion(-) (limited to 'java/client/src') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 48c4e3e3e6..90e3100690 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -371,7 +371,7 @@ public abstract class AMQSession tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags(); + _prefetchedMessageTags.addAll(tags); + } + } + _usingDispatcherForCleanup = true; + drainDispatchQueue(); + _usingDispatcherForCleanup = false; + + RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags); + RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags); + RangeSet all = RangeSetFactory.createRangeSet(delivered.size() + + prefetched.size()); + + for (Iterator deliveredIter = delivered.iterator(); deliveredIter.hasNext();) + { + Range range = deliveredIter.next(); + all.add(range); + } + + for (Iterator prefetchedIter = prefetched.iterator(); prefetchedIter.hasNext();) + { + Range range = prefetchedIter.next(); + all.add(range); + } + + flushProcessed(all, false); + getQpidSession().messageRelease(delivered,Option.SET_REDELIVERED); + getQpidSession().messageRelease(prefetched); + sync(); + } + } -- cgit v1.2.1