diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 32 |
1 files changed, 29 insertions, 3 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 755f2f271b..eb29d9d805 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -86,7 +86,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private int _nextTag = 1; /** This queue is bounded and is used to store messages before being dispatched to the consumer */ - private final FlowControllingBlockingQueue _queue; + public final FlowControllingBlockingQueue _queue; private Dispatcher _dispatcher; @@ -804,16 +804,44 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi checkNotTransacted(); // throws IllegalStateException if a transacted session // this is set only here, and the before the consumer's onMessage is called it is set to false _inRecovery = true; + + boolean isSuspended = isSuspended(); + + if (!isSuspended) + { + try + { + suspendChannel(true); + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } + } for (BasicMessageConsumer consumer : _consumers.values()) { consumer.clearUnackedMessages(); } + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, (byte) 8, (byte) 0, // AMQP version (major, minor) false)); // requeue + + if (!isSuspended) + { + try + { + suspendChannel(false); + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } + } + } boolean isInRecovery() @@ -836,8 +864,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { consumer.acknowledge(); } - - } |