summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java54
1 files changed, 33 insertions, 21 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
index 5342a7f518..0343457a73 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
@@ -25,7 +25,6 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.ConsumerTagNotUniqueException;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.access.Permission;
@@ -97,8 +96,18 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
final AMQShortString consumerTagName;
- //Perform ACLs
- vHost.getAccessManager().authorise(session, Permission.CONSUME, body, queue);
+ // Check authz
+ if (!vHost.getAccessManager().authoriseConsume(session,
+ body.getExclusive(), body.getNoAck(),
+ body.getNoLocal(), body.getNowait(), queue))
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
+ }
+ else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue " + queue.getName() + " is exclusive, but not created on this Connection.");
+ }
if (body.getConsumerTag() != null)
{
@@ -111,17 +120,31 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
try
{
- AMQShortString consumerTag = channel.subscribeToQueue(consumerTagName, queue, !body.getNoAck(),
- body.getArguments(), body.getNoLocal(), body.getExclusive());
- if (!body.getNowait())
+ if(consumerTagName == null || channel.getSubscription(consumerTagName) == null)
{
- MethodRegistry methodRegistry = session.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag);
- session.writeFrame(responseBody.generateFrame(channelId));
+ AMQShortString consumerTag = channel.subscribeToQueue(consumerTagName, queue, !body.getNoAck(),
+ body.getArguments(), body.getNoLocal(), body.getExclusive());
+ if (!body.getNowait())
+ {
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag);
+ session.writeFrame(responseBody.generateFrame(channelId));
+
+ }
+ }
+ else
+ {
+ AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.getConsumerTag() + "'");
+
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode
+ msg, // replytext
+ body.getClazz(),
+ body.getMethod());
+ session.writeFrame(responseBody.generateFrame(0));
}
-
}
catch (org.apache.qpid.AMQInvalidArgumentException ise)
{
@@ -136,17 +159,6 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
}
- catch (ConsumerTagNotUniqueException e)
- {
- AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.getConsumerTag() + "'");
-
- MethodRegistry methodRegistry = session.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode
- msg, // replytext
- body.getClazz(),
- body.getMethod());
- session.writeFrame(responseBody.generateFrame(0));
- }
catch (AMQQueue.ExistingExclusiveSubscription e)
{
throw body.getChannelException(AMQConstant.ACCESS_REFUSED,