diff options
Diffstat (limited to 'java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java')
-rw-r--r-- | java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java | 36 |
1 files changed, 29 insertions, 7 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java index b1d2fa5088..c9a7cc69a1 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java @@ -31,6 +31,7 @@ import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.flow.FlowCreditManager; @@ -45,6 +46,7 @@ import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod; import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod; import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.EnumSet; @@ -106,14 +108,33 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB } } - if (!performGet(queue,protocolConnection, channel, !body.getNoAck())) + try { - MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); - // TODO - set clusterId - BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null); + if (!performGet(queue,protocolConnection, channel, !body.getNoAck())) + { + MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); + // TODO - set clusterId + BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null); - protocolConnection.writeFrame(responseBody.generateFrame(channelId)); + protocolConnection.writeFrame(responseBody.generateFrame(channelId)); + } + } + catch (QpidSecurityException e) + { + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, + e.getMessage()); + } + catch (MessageSource.ExistingExclusiveConsumer e) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "Queue has an exclusive consumer"); + } + catch (MessageSource.ExistingConsumerPreventsExclusive e) + { + throw body.getConnectionException(AMQConstant.INTERNAL_ERROR, + "The GET request has been evaluated as an exclusive consumer, " + + "this is likely due to a programming error in the Qpid broker"); } } } @@ -123,7 +144,8 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB final AMQProtocolSession session, final AMQChannel channel, final boolean acks) - throws AMQException + throws AMQException, QpidSecurityException, MessageSource.ExistingConsumerPreventsExclusive, + MessageSource.ExistingExclusiveConsumer { final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L); @@ -186,7 +208,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB @Override public void deliverToClient(final Consumer sub, final ServerMessage message, - final InstanceProperties props, final long deliveryTag) throws AMQException + final InstanceProperties props, final long deliveryTag) { _singleMessageCredit.useCreditForMessage(message.getSize()); _session.getProtocolOutputConverter().writeGetOk(message, |