diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 27 |
1 files changed, 7 insertions, 20 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 9611c534bb..78dc46fb1d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -3206,28 +3206,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public void rejectPending(C consumer) { - synchronized (_lock) - { - boolean stopped = connectionStopped(); + // Reject messages on pre-receive queue + consumer.rollbackPendingMessages(); - if (!stopped) - { - setConnectionStopped(true); - } + // Reject messages on pre-dispatch queue + rejectMessagesForConsumerTag(consumer.getConsumerTag(), true, false); - // Reject messages on pre-receive queue - consumer.rollbackPendingMessages(); + // closeConsumer + consumer.markClosed(); - // Reject messages on pre-dispatch queue - rejectMessagesForConsumerTag(consumer.getConsumerTag(), true, false); - //Let the dispatcher deal with this when it gets to them. - - // closeConsumer - consumer.markClosed(); - - setConnectionStopped(stopped); - - } } public void rollback() @@ -3419,7 +3406,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { final C consumer = _consumers.get(message.getConsumerTag()); - if ((consumer == null) || consumer.isClosed()) + if ((consumer == null) || consumer.isClosed() || consumer.isClosing()) { if (_dispatcherLogger.isInfoEnabled()) { |