summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java30
1 files changed, 25 insertions, 5 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
index be1135dd91..2c4a9b310a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
@@ -30,6 +30,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.MessageOnlyCreditManager;
import org.apache.qpid.server.subscription.SubscriptionImpl;
@@ -40,8 +41,6 @@ import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
-import org.apache.qpid.server.security.access.Permission;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -94,7 +93,15 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
{
//Perform ACLs
- vHost.getAccessManager().authorise(session, Permission.CONSUME, body, queue);
+ if (!vHost.getAccessManager().authoriseConsume(session, body.getNoAck(), queue))
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
+ }
+ else if (queue.isExclusive() && queue.getExclusiveOwner() != session)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue is exclusive, but not created on this Connection.");
+ }
if (!performGet(queue,session, channel, !body.getNoAck()))
{
@@ -127,8 +134,16 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
throws AMQException
{
singleMessageCredit.useCreditForMessage(entry.getMessage());
- session.getProtocolOutputConverter().writeGetOk(entry.getMessage(), channel.getChannelId(),
- deliveryTag, queue.getMessageCount());
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ session.getProtocolOutputConverter().writeGetOk(entry, channel.getChannelId(),
+ deliveryTag, queue.getMessageCount());
+ }
+ else
+ {
+ //TODO Convert AMQP 0-10 message
+ throw new RuntimeException("Not implemented conversion of 0-10 message");
+ }
}
};
@@ -178,6 +193,11 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
}
+ public boolean isTransient()
+ {
+ return true;
+ }
+
public boolean wouldSuspend(QueueEntry msg)
{
return !getCreditManager().useCreditForMessage(msg.getMessage());