diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java | 54 |
1 files changed, 33 insertions, 21 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index 5342a7f518..0343457a73 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -25,7 +25,6 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.ConsumerTagNotUniqueException; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.access.Permission; @@ -97,8 +96,18 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic final AMQShortString consumerTagName; - //Perform ACLs - vHost.getAccessManager().authorise(session, Permission.CONSUME, body, queue); + // Check authz + if (!vHost.getAccessManager().authoriseConsume(session, + body.getExclusive(), body.getNoAck(), + body.getNoLocal(), body.getNowait(), queue)) + { + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied"); + } + else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "Queue " + queue.getName() + " is exclusive, but not created on this Connection."); + } if (body.getConsumerTag() != null) { @@ -111,17 +120,31 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic try { - AMQShortString consumerTag = channel.subscribeToQueue(consumerTagName, queue, !body.getNoAck(), - body.getArguments(), body.getNoLocal(), body.getExclusive()); - if (!body.getNowait()) + if(consumerTagName == null || channel.getSubscription(consumerTagName) == null) { - MethodRegistry methodRegistry = session.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag); - session.writeFrame(responseBody.generateFrame(channelId)); + AMQShortString consumerTag = channel.subscribeToQueue(consumerTagName, queue, !body.getNoAck(), + body.getArguments(), body.getNoLocal(), body.getExclusive()); + if (!body.getNowait()) + { + MethodRegistry methodRegistry = session.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag); + session.writeFrame(responseBody.generateFrame(channelId)); + + } + } + else + { + AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.getConsumerTag() + "'"); + + MethodRegistry methodRegistry = session.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode + msg, // replytext + body.getClazz(), + body.getMethod()); + session.writeFrame(responseBody.generateFrame(0)); } - } catch (org.apache.qpid.AMQInvalidArgumentException ise) { @@ -136,17 +159,6 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic } - catch (ConsumerTagNotUniqueException e) - { - AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.getConsumerTag() + "'"); - - MethodRegistry methodRegistry = session.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode - msg, // replytext - body.getClazz(), - body.getMethod()); - session.writeFrame(responseBody.generateFrame(0)); - } catch (AMQQueue.ExistingExclusiveSubscription e) { throw body.getChannelException(AMQConstant.ACCESS_REFUSED, |