summaryrefslogtreecommitdiff
path: root/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java')
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java36
1 files changed, 29 insertions, 7 deletions
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
index b1d2fa5088..c9a7cc69a1 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
@@ -31,6 +31,7 @@ import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.flow.FlowCreditManager;
@@ -45,6 +46,7 @@ import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod;
import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod;
import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.EnumSet;
@@ -106,14 +108,33 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
}
}
- if (!performGet(queue,protocolConnection, channel, !body.getNoAck()))
+ try
{
- MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
- // TODO - set clusterId
- BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null);
+ if (!performGet(queue,protocolConnection, channel, !body.getNoAck()))
+ {
+ MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
+ // TODO - set clusterId
+ BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null);
- protocolConnection.writeFrame(responseBody.generateFrame(channelId));
+ protocolConnection.writeFrame(responseBody.generateFrame(channelId));
+ }
+ }
+ catch (QpidSecurityException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ e.getMessage());
+ }
+ catch (MessageSource.ExistingExclusiveConsumer e)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue has an exclusive consumer");
+ }
+ catch (MessageSource.ExistingConsumerPreventsExclusive e)
+ {
+ throw body.getConnectionException(AMQConstant.INTERNAL_ERROR,
+ "The GET request has been evaluated as an exclusive consumer, " +
+ "this is likely due to a programming error in the Qpid broker");
}
}
}
@@ -123,7 +144,8 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
final AMQProtocolSession session,
final AMQChannel channel,
final boolean acks)
- throws AMQException
+ throws AMQException, QpidSecurityException, MessageSource.ExistingConsumerPreventsExclusive,
+ MessageSource.ExistingExclusiveConsumer
{
final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L);
@@ -186,7 +208,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
@Override
public void deliverToClient(final Consumer sub, final ServerMessage message,
- final InstanceProperties props, final long deliveryTag) throws AMQException
+ final InstanceProperties props, final long deliveryTag)
{
_singleMessageCredit.useCreditForMessage(message.getSize());
_session.getProtocolOutputConverter().writeGetOk(message,