diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-02-16 09:06:47 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-02-16 09:06:47 +0000 |
commit | f45338ccb5afdf1ff2484f0ebf3f74abf2acd263 (patch) | |
tree | 93ff03ce9a186a3ffeb5bbce83296dc0741fe0a9 | |
parent | 7a48e7adf5d8db51c58878888f8a7ca62da16cf5 (diff) | |
download | qpid-python-f45338ccb5afdf1ff2484f0ebf3f74abf2acd263.tar.gz |
BasicConsumeMethodHandler - tidied up local channel/connection close frame writes by using the body.get[Channel|Connection]Exception() to throw a new exception to write out the frames.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@508351 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java | 49 |
1 files changed, 17 insertions, 32 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index 18448848a6..feb6f6b1fa 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -75,7 +75,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic if (queue == null) { _log.info("No queue for '" + body.queue + "'"); - if(body.queue!=null) + if (body.queue != null) { String msg = "No such queue, '" + body.queue + "'"; throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), msg); @@ -83,7 +83,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic else { String msg = "No queue name provided, no default queue defined."; - throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),msg ); + throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(), msg); } } else @@ -91,15 +91,15 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic try { AMQShortString consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck, - body.arguments, body.noLocal, body.exclusive); + body.arguments, body.noLocal, body.exclusive); if (!body.nowait) { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - consumerTag)); // consumerTag + (byte) 8, (byte) 0, // AMQP version (major, minor) + consumerTag)); // consumerTag } //now allow queue to start async processing of any backlog of messages @@ -108,43 +108,28 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic catch (AMQInvalidSelectorException ise) { _log.info("Closing connection due to invalid selector"); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - BasicConsumeBody.getClazz((byte)8, (byte)0), // classId - BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId - AMQConstant.INVALID_SELECTOR.getCode(), // replyCode - new AMQShortString(ise.getMessage()))); // replyText + throw body.getChannelException(AMQConstant.INVALID_SELECTOR.getCode(), ise.getMessage()); } catch (ConsumerTagNotUniqueException e) { AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.consumerTag + "'"); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - BasicConsumeBody.getClazz((byte)8, (byte)0), // classId - BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId - AMQConstant.NOT_ALLOWED.getCode(), // replyCode - msg)); // replyText + throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(), + "Non-unique consumer tag, '" + body.consumerTag + "'"); } catch (AMQQueue.ExistingExclusiveSubscription e) { throw body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(), - "Cannot subscribe to queue " - + queue.getName() - + " as it already has an existing exclusive consumer"); + "Cannot subscribe to queue " + + queue.getName() + + " as it already has an existing exclusive consumer"); } catch (AMQQueue.ExistingSubscriptionPreventsExclusive e) - { - throw body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(), - "Cannot subscribe to queue " - + queue.getName() - + " exclusively as it already has a consumer"); - } + { + throw body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(), + "Cannot subscribe to queue " + + queue.getName() + + " exclusively as it already has a consumer"); + } } } |