summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-02-16 09:06:47 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-02-16 09:06:47 +0000
commitf45338ccb5afdf1ff2484f0ebf3f74abf2acd263 (patch)
tree93ff03ce9a186a3ffeb5bbce83296dc0741fe0a9
parent7a48e7adf5d8db51c58878888f8a7ca62da16cf5 (diff)
downloadqpid-python-f45338ccb5afdf1ff2484f0ebf3f74abf2acd263.tar.gz
BasicConsumeMethodHandler - tidied up local channel/connection close frame writes by using the body.get[Channel|Connection]Exception() to throw a new exception to write out the frames.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@508351 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java49
1 files changed, 17 insertions, 32 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
index 18448848a6..feb6f6b1fa 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
@@ -75,7 +75,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
if (queue == null)
{
_log.info("No queue for '" + body.queue + "'");
- if(body.queue!=null)
+ if (body.queue != null)
{
String msg = "No such queue, '" + body.queue + "'";
throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), msg);
@@ -83,7 +83,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
else
{
String msg = "No queue name provided, no default queue defined.";
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),msg );
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(), msg);
}
}
else
@@ -91,15 +91,15 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
try
{
AMQShortString consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck,
- body.arguments, body.noLocal, body.exclusive);
+ body.arguments, body.noLocal, body.exclusive);
if (!body.nowait)
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- consumerTag)); // consumerTag
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ consumerTag)); // consumerTag
}
//now allow queue to start async processing of any backlog of messages
@@ -108,43 +108,28 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
catch (AMQInvalidSelectorException ise)
{
_log.info("Closing connection due to invalid selector");
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- session.writeFrame(ChannelCloseBody.createAMQFrame(channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- BasicConsumeBody.getClazz((byte)8, (byte)0), // classId
- BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId
- AMQConstant.INVALID_SELECTOR.getCode(), // replyCode
- new AMQShortString(ise.getMessage()))); // replyText
+ throw body.getChannelException(AMQConstant.INVALID_SELECTOR.getCode(), ise.getMessage());
}
catch (ConsumerTagNotUniqueException e)
{
AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.consumerTag + "'");
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- BasicConsumeBody.getClazz((byte)8, (byte)0), // classId
- BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId
- AMQConstant.NOT_ALLOWED.getCode(), // replyCode
- msg)); // replyText
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),
+ "Non-unique consumer tag, '" + body.consumerTag + "'");
}
catch (AMQQueue.ExistingExclusiveSubscription e)
{
throw body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(),
- "Cannot subscribe to queue "
- + queue.getName()
- + " as it already has an existing exclusive consumer");
+ "Cannot subscribe to queue "
+ + queue.getName()
+ + " as it already has an existing exclusive consumer");
}
catch (AMQQueue.ExistingSubscriptionPreventsExclusive e)
- {
- throw body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(),
- "Cannot subscribe to queue "
- + queue.getName()
- + " exclusively as it already has a consumer");
- }
+ {
+ throw body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(),
+ "Cannot subscribe to queue "
+ + queue.getName()
+ + " exclusively as it already has a consumer");
+ }
}
}