diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java | 31 |
1 files changed, 24 insertions, 7 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index 164094ac58..d3aece9818 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -81,7 +81,12 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic body.arguments, body.noLocal); if (!body.nowait) { - session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, consumerTag)); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + consumerTag)); // consumerTag } //now allow queue to start async processing of any backlog of messages @@ -90,16 +95,28 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic catch (AMQInvalidSelectorException ise) { _log.info("Closing connection due to invalid selector"); - session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, AMQConstant.INVALID_SELECTOR.getCode(), - ise.getMessage(), BasicConsumeBody.CLASS_ID, - BasicConsumeBody.METHOD_ID)); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + BasicConsumeBody.getClazz((byte)8, (byte)0), // classId + BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId + AMQConstant.INVALID_SELECTOR.getCode(), // replyCode + ise.getMessage())); // replyText } catch (ConsumerTagNotUniqueException e) { String msg = "Non-unique consumer tag, '" + body.consumerTag + "'"; - session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, AMQConstant.NOT_ALLOWED.getCode(), msg, - BasicConsumeBody.CLASS_ID, - BasicConsumeBody.METHOD_ID)); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + BasicConsumeBody.getClazz((byte)8, (byte)0), // classId + BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId + AMQConstant.NOT_ALLOWED.getCode(), // replyCode + msg)); // replyText } } } |