diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 66 |
1 files changed, 64 insertions, 2 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index e475270ecd..783678f67c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -47,6 +47,7 @@ import java.text.MessageFormat; import java.util.ArrayList; import java.util.Iterator; import java.util.Map; +import java.util.LinkedList; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -289,6 +290,61 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } + + /** + * The dispatcher should be stopped when calling this. + * + * @param consumerTag + */ + public void removePending(String consumerTag) + { + + synchronized (_lock) + { + boolean stopped = connectionStopped(); + + _dispatcher.setConnectionStopped(false); + + LinkedList<UnprocessedMessage> tmpList = new LinkedList<UnprocessedMessage>(); + + while (_queue.size() != 0) + { + UnprocessedMessage message = null; + try + { + message = (UnprocessedMessage) _queue.take(); + + if (!message.deliverBody.consumerTag.equals(consumerTag)) + { + tmpList.add(message); + } + else + { + _logger.error("Pruned pending message for consumer:" + consumerTag); + } + } + catch (InterruptedException e) + { + _logger.error("Interrupted whilst taking message"); + } + } + + if (!tmpList.isEmpty()) + { + _logger.error("Tmp list is not empty"); + } + + for (UnprocessedMessage msg : tmpList) + { + _queue.add(msg); + } + + if (stopped) + { + _dispatcher.setConnectionStopped(stopped); + } + } + } } AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, @@ -599,8 +655,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi //Ensure we only try and close an open session. if (!_closed.getAndSet(true)) { - // we pass null since this is not an error case - closeProducersAndConsumers(null); try { @@ -618,6 +672,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // When control resumes at this point, a reply will have been received that // indicates the broker has closed the channel successfully + // we pass null since this is not an error case + closeProducersAndConsumers(null); + } catch (AMQException e) { @@ -1784,7 +1841,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ void deregisterConsumer(BasicMessageConsumer consumer) { + //need to clear pending messages from session _queue that the dispatcher will handle + // or we will get + // _dispatcher.removePending(consumer.getConsumerTag()); + _consumers.remove(consumer.getConsumerTag()); + String subscriptionName = _reverseSubscriptionMap.remove(consumer); if (subscriptionName != null) { |