diff options
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java | 49 |
1 files changed, 17 insertions, 32 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index 18448848a6..feb6f6b1fa 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -75,7 +75,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic if (queue == null) { _log.info("No queue for '" + body.queue + "'"); - if(body.queue!=null) + if (body.queue != null) { String msg = "No such queue, '" + body.queue + "'"; throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), msg); @@ -83,7 +83,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic else { String msg = "No queue name provided, no default queue defined."; - throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),msg ); + throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(), msg); } } else @@ -91,15 +91,15 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic try { AMQShortString consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck, - body.arguments, body.noLocal, body.exclusive); + body.arguments, body.noLocal, body.exclusive); if (!body.nowait) { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - consumerTag)); // consumerTag + (byte) 8, (byte) 0, // AMQP version (major, minor) + consumerTag)); // consumerTag } //now allow queue to start async processing of any backlog of messages @@ -108,43 +108,28 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic catch (AMQInvalidSelectorException ise) { _log.info("Closing connection due to invalid selector"); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - BasicConsumeBody.getClazz((byte)8, (byte)0), // classId - BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId - AMQConstant.INVALID_SELECTOR.getCode(), // replyCode - new AMQShortString(ise.getMessage()))); // replyText + throw body.getChannelException(AMQConstant.INVALID_SELECTOR.getCode(), ise.getMessage()); } catch (ConsumerTagNotUniqueException e) { AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.consumerTag + "'"); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - BasicConsumeBody.getClazz((byte)8, (byte)0), // classId - BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId - AMQConstant.NOT_ALLOWED.getCode(), // replyCode - msg)); // replyText + throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(), + "Non-unique consumer tag, '" + body.consumerTag + "'"); } catch (AMQQueue.ExistingExclusiveSubscription e) { throw body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(), - "Cannot subscribe to queue " - + queue.getName() - + " as it already has an existing exclusive consumer"); + "Cannot subscribe to queue " + + queue.getName() + + " as it already has an existing exclusive consumer"); } catch (AMQQueue.ExistingSubscriptionPreventsExclusive e) - { - throw body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(), - "Cannot subscribe to queue " - + queue.getName() - + " exclusively as it already has a consumer"); - } + { + throw body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(), + "Cannot subscribe to queue " + + queue.getName() + + " exclusively as it already has a consumer"); + } } } |