diff options
2 files changed, 59 insertions, 26 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java index 47700f812f..cd71db15cd 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java @@ -31,6 +31,7 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.v0_8.handler.BasicGetMethodHandler; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.consumer.AbstractConsumerTarget; @@ -80,6 +81,16 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen return new BrowserConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod()); } + public static ConsumerTarget_0_8 createGetNoAckTarget(final AMQChannel channel, + final AMQShortString consumerTag, + final FieldTable filters, + final FlowCreditManager creditManager, + final ClientDeliveryMethod deliveryMethod, + final RecordDeliveryMethod recordMethod) throws AMQException + { + return new GetNoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); + } + static final class BrowserConsumer extends ConsumerTarget_0_8 { public BrowserConsumer(AMQChannel channel, @@ -132,10 +143,10 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen } public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel, - AMQShortString consumerTag, FieldTable filters, - FlowCreditManager creditManager, - ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod) throws AMQException + AMQShortString consumerTag, FieldTable filters, + FlowCreditManager creditManager, + ClientDeliveryMethod deliveryMethod, + RecordDeliveryMethod recordMethod) throws AMQException { return new NoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); } @@ -223,9 +234,9 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen */ public static final class GetNoAckConsumer extends NoAckConsumer { - public GetNoAckConsumer(AMQChannel channel, AMQProtocolSession protocolSession, + public GetNoAckConsumer(AMQChannel channel, AMQShortString consumerTag, FieldTable filters, - boolean noLocal, FlowCreditManager creditManager, + FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) throws AMQException diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java index d4bd486a99..b1d2fa5088 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java @@ -128,24 +128,8 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L); - final ClientDeliveryMethod getDeliveryMethod = new ClientDeliveryMethod() - { - - @Override - public void deliverToClient(final Consumer sub, final ServerMessage message, final - InstanceProperties props, final long deliveryTag) - throws AMQException - { - singleMessageCredit.useCreditForMessage(message.getSize()); - session.getProtocolOutputConverter().writeGetOk(message, - props, - channel.getChannelId(), - deliveryTag, - queue.getMessageCount()); - - - } - }; + final GetDeliveryMethod getDeliveryMethod = + new GetDeliveryMethod(singleMessageCredit, session, channel, queue); final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod() { @@ -167,7 +151,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB } else { - target = ConsumerTarget_0_8.createNoAckTarget(channel, + target = ConsumerTarget_0_8.createGetNoAckTarget(channel, AMQShortString.EMPTY_STRING, null, singleMessageCredit, getDeliveryMethod, getRecordMethod); } @@ -175,10 +159,48 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB Consumer sub = queue.addConsumer(target, null, AMQMessage.class, "", options); sub.flush(); sub.close(); - return(!singleMessageCredit.hasCredit()); + return(getDeliveryMethod.hasDeliveredMessage()); } + private static class GetDeliveryMethod implements ClientDeliveryMethod + { + + private final FlowCreditManager _singleMessageCredit; + private final AMQProtocolSession _session; + private final AMQChannel _channel; + private final AMQQueue _queue; + private boolean _deliveredMessage; + + public GetDeliveryMethod(final FlowCreditManager singleMessageCredit, + final AMQProtocolSession session, + final AMQChannel channel, final AMQQueue queue) + { + _singleMessageCredit = singleMessageCredit; + _session = session; + _channel = channel; + _queue = queue; + } + + @Override + public void deliverToClient(final Consumer sub, final ServerMessage message, + final InstanceProperties props, final long deliveryTag) throws AMQException + { + _singleMessageCredit.useCreditForMessage(message.getSize()); + _session.getProtocolOutputConverter().writeGetOk(message, + props, + _channel.getChannelId(), + deliveryTag, + _queue.getMessageCount()); + + _deliveredMessage = true; + } + + public boolean hasDeliveredMessage() + { + return _deliveredMessage; + } + } } |