summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java32
1 files changed, 29 insertions, 3 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 755f2f271b..eb29d9d805 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -86,7 +86,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private int _nextTag = 1;
/** This queue is bounded and is used to store messages before being dispatched to the consumer */
- private final FlowControllingBlockingQueue _queue;
+ public final FlowControllingBlockingQueue _queue;
private Dispatcher _dispatcher;
@@ -804,16 +804,44 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
checkNotTransacted(); // throws IllegalStateException if a transacted session
// this is set only here, and the before the consumer's onMessage is called it is set to false
_inRecovery = true;
+
+ boolean isSuspended = isSuspended();
+
+ if (!isSuspended)
+ {
+ try
+ {
+ suspendChannel(true);
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
+ }
for (BasicMessageConsumer consumer : _consumers.values())
{
consumer.clearUnackedMessages();
}
+
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
_connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
(byte) 8, (byte) 0, // AMQP version (major, minor)
false)); // requeue
+
+ if (!isSuspended)
+ {
+ try
+ {
+ suspendChannel(false);
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
+ }
+
}
boolean isInRecovery()
@@ -836,8 +864,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
consumer.acknowledge();
}
-
-
}