diff options
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java')
-rw-r--r-- | qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java | 37 |
1 files changed, 24 insertions, 13 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java index 4b569ccc71..d4bd486a99 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java @@ -24,27 +24,31 @@ package org.apache.qpid.server.protocol.v0_8.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicGetBody; import org.apache.qpid.framing.BasicGetEmptyBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.MessageOnlyCreditManager; +import org.apache.qpid.server.protocol.v0_8.AMQMessage; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; -import org.apache.qpid.server.subscription.ClientDeliveryMethod; -import org.apache.qpid.server.subscription.RecordDeliveryMethod; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.protocol.v0_8.SubscriptionFactoryImpl; +import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod; +import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.virtualhost.VirtualHost; +import java.util.EnumSet; + public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetBody> { private static final Logger _log = Logger.getLogger(BasicGetMethodHandler.class); @@ -128,7 +132,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB { @Override - public void deliverToClient(final Subscription sub, final ServerMessage message, final + public void deliverToClient(final Consumer sub, final ServerMessage message, final InstanceProperties props, final long deliveryTag) throws AMQException { @@ -145,25 +149,32 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod() { - public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag) + public void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag) { channel.addUnacknowledgedMessage(entry, deliveryTag, null); } }; - Subscription sub; + ConsumerTarget_0_8 target; + EnumSet<Consumer.Option> options = EnumSet.of(Consumer.Option.TRANSIENT, Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES); if(acks) { - sub = SubscriptionFactoryImpl.INSTANCE.createSubscription(channel, session, null, acks, null, false, singleMessageCredit, getDeliveryMethod, getRecordMethod); + + target = ConsumerTarget_0_8.createAckTarget(channel, + AMQShortString.EMPTY_STRING, null, + singleMessageCredit, getDeliveryMethod, getRecordMethod); } else { - sub = SubscriptionFactoryImpl.INSTANCE.createBasicGetNoAckSubscription(channel, session, null, null, false, singleMessageCredit, getDeliveryMethod, getRecordMethod); + target = ConsumerTarget_0_8.createNoAckTarget(channel, + AMQShortString.EMPTY_STRING, null, + singleMessageCredit, getDeliveryMethod, getRecordMethod); } - queue.registerSubscription(sub,false); - queue.flushSubscription(sub); - queue.unregisterSubscription(sub); + Consumer sub = queue.addConsumer(target, null, AMQMessage.class, "", options); + sub.flush(); + sub.close(); return(!singleMessageCredit.hasCredit()); |