summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java27
1 files changed, 7 insertions, 20 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index ce624cb91b..55d3ccb6e7 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -3212,28 +3212,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public void rejectPending(C consumer)
{
- synchronized (_lock)
- {
- boolean stopped = connectionStopped();
+ // Reject messages on pre-receive queue
+ consumer.rollbackPendingMessages();
- if (!stopped)
- {
- setConnectionStopped(true);
- }
+ // Reject messages on pre-dispatch queue
+ rejectMessagesForConsumerTag(consumer.getConsumerTag(), true, false);
- // Reject messages on pre-receive queue
- consumer.rollbackPendingMessages();
+ // closeConsumer
+ consumer.markClosed();
- // Reject messages on pre-dispatch queue
- rejectMessagesForConsumerTag(consumer.getConsumerTag(), true, false);
- //Let the dispatcher deal with this when it gets to them.
-
- // closeConsumer
- consumer.markClosed();
-
- setConnectionStopped(stopped);
-
- }
}
public void rollback()
@@ -3425,7 +3412,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
final C consumer = _consumers.get(message.getConsumerTag());
- if ((consumer == null) || consumer.isClosed())
+ if ((consumer == null) || consumer.isClosed() || consumer.isClosing())
{
if (_dispatcherLogger.isInfoEnabled())
{