/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * * */ package org.apache.qpid.server.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicGetBody; import org.apache.qpid.framing.BasicGetEmptyBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.MessageOnlyCreditManager; import org.apache.qpid.server.subscription.SubscriptionImpl; import org.apache.qpid.server.subscription.ClientDeliveryMethod; import org.apache.qpid.server.subscription.RecordDeliveryMethod; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.SimpleAMQQueue; import org.apache.qpid.server.security.access.Permission; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.virtualhost.VirtualHost; public class BasicGetMethodHandler implements StateAwareMethodListener { private static final Logger _log = Logger.getLogger(BasicGetMethodHandler.class); private static final BasicGetMethodHandler _instance = new BasicGetMethodHandler(); public static BasicGetMethodHandler getInstance() { return _instance; } private BasicGetMethodHandler() { } public void methodReceived(AMQStateManager stateManager, BasicGetBody body, int channelId) throws AMQException { AMQProtocolSession session = stateManager.getProtocolSession(); VirtualHost vHost = session.getVirtualHost(); AMQChannel channel = session.getChannel(channelId); if (channel == null) { throw body.getChannelNotFoundException(channelId); } else { AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.getQueue()); if (queue == null) { _log.info("No queue for '" + body.getQueue() + "'"); if(body.getQueue()!=null) { throw body.getConnectionException(AMQConstant.NOT_FOUND, "No such queue, '" + body.getQueue()+ "'"); } else { throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "No queue name provided, no default queue defined."); } } else { //Perform ACLs vHost.getAccessManager().authorise(session, Permission.CONSUME, body, queue); if (!performGet(queue,session, channel, !body.getNoAck())) { MethodRegistry methodRegistry = session.getMethodRegistry(); // TODO - set clusterId BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null); session.writeFrame(responseBody.generateFrame(channelId)); } } } } public static boolean performGet(final AMQQueue queue, final AMQProtocolSession session, final AMQChannel channel, final boolean acks) throws AMQException { final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L); final ClientDeliveryMethod getDeliveryMethod = new ClientDeliveryMethod() { int _msg; public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag) throws AMQException { singleMessageCredit.useCreditForMessage(entry.getMessage()); session.getProtocolOutputConverter().writeGetOk(entry.getMessage(), channel.getChannelId(), deliveryTag, queue.getMessageCount()); } }; final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod() { public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag) { channel.addUnacknowledgedMessage(entry, deliveryTag, null); } }; Subscription sub; if(acks) { sub = SubscriptionFactoryImpl.INSTANCE.createSubscription(channel, session, null, acks, null, false, singleMessageCredit, getDeliveryMethod, getRecordMethod); } else { sub = new GetNoAckSubscription(channel, session, null, null, false, singleMessageCredit, getDeliveryMethod, getRecordMethod); } queue.registerSubscription(sub,false); queue.flushSubscription(sub); queue.unregisterSubscription(sub); return(!singleMessageCredit.hasCredit()); } public static final class GetNoAckSubscription extends SubscriptionImpl.NoAckSubscription { public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession, AMQShortString consumerTag, FieldTable filters, boolean noLocal, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) throws AMQException { super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod); } public boolean wouldSuspend(QueueEntry msg) { return !getCreditManager().useCreditForMessage(msg.getMessage()); } } }