summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java31
1 files changed, 24 insertions, 7 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
index 164094ac58..d3aece9818 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
@@ -81,7 +81,12 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
body.arguments, body.noLocal);
if (!body.nowait)
{
- session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, consumerTag));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ consumerTag)); // consumerTag
}
//now allow queue to start async processing of any backlog of messages
@@ -90,16 +95,28 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
catch (AMQInvalidSelectorException ise)
{
_log.info("Closing connection due to invalid selector");
- session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, AMQConstant.INVALID_SELECTOR.getCode(),
- ise.getMessage(), BasicConsumeBody.CLASS_ID,
- BasicConsumeBody.METHOD_ID));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(ChannelCloseBody.createAMQFrame(channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ BasicConsumeBody.getClazz((byte)8, (byte)0), // classId
+ BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId
+ AMQConstant.INVALID_SELECTOR.getCode(), // replyCode
+ ise.getMessage())); // replyText
}
catch (ConsumerTagNotUniqueException e)
{
String msg = "Non-unique consumer tag, '" + body.consumerTag + "'";
- session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, AMQConstant.NOT_ALLOWED.getCode(), msg,
- BasicConsumeBody.CLASS_ID,
- BasicConsumeBody.METHOD_ID));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ BasicConsumeBody.getClazz((byte)8, (byte)0), // classId
+ BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId
+ AMQConstant.NOT_ALLOWED.getCode(), // replyCode
+ msg)); // replyText
}
}
}