diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-02-10 08:58:38 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-02-10 08:58:38 +0000 |
commit | 8cc7eac4f1c528b27081684201f880661eb65e85 (patch) | |
tree | cf836f463af2804f9884347d16759f9912cb69dd | |
parent | 4f64702bcd4da0ce73622ec807d99cc3f50f309b (diff) | |
download | qpid-python-8cc7eac4f1c528b27081684201f880661eb65e85.tar.gz |
QPID-5504 : fixed implementation of 0-8 GET when using NoAck
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1566531 13f79535-47bb-0310-9956-ffa450edef68
2 files changed, 59 insertions, 26 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java index 47700f812f..cd71db15cd 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java @@ -31,6 +31,7 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.v0_8.handler.BasicGetMethodHandler; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.consumer.AbstractConsumerTarget; @@ -80,6 +81,16 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen return new BrowserConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod()); } + public static ConsumerTarget_0_8 createGetNoAckTarget(final AMQChannel channel, + final AMQShortString consumerTag, + final FieldTable filters, + final FlowCreditManager creditManager, + final ClientDeliveryMethod deliveryMethod, + final RecordDeliveryMethod recordMethod) throws AMQException + { + return new GetNoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); + } + static final class BrowserConsumer extends ConsumerTarget_0_8 { public BrowserConsumer(AMQChannel channel, @@ -132,10 +143,10 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen } public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel, - AMQShortString consumerTag, FieldTable filters, - FlowCreditManager creditManager, - ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod) throws AMQException + AMQShortString consumerTag, FieldTable filters, + FlowCreditManager creditManager, + ClientDeliveryMethod deliveryMethod, + RecordDeliveryMethod recordMethod) throws AMQException { return new NoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); } @@ -223,9 +234,9 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen */ public static final class GetNoAckConsumer extends NoAckConsumer { - public GetNoAckConsumer(AMQChannel channel, AMQProtocolSession protocolSession, + public GetNoAckConsumer(AMQChannel channel, AMQShortString consumerTag, FieldTable filters, - boolean noLocal, FlowCreditManager creditManager, + FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) throws AMQException diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java index d4bd486a99..b1d2fa5088 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java @@ -128,24 +128,8 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L); - final ClientDeliveryMethod getDeliveryMethod = new ClientDeliveryMethod() - { - - @Override - public void deliverToClient(final Consumer sub, final ServerMessage message, final - InstanceProperties props, final long deliveryTag) - throws AMQException - { - singleMessageCredit.useCreditForMessage(message.getSize()); - session.getProtocolOutputConverter().writeGetOk(message, - props, - channel.getChannelId(), - deliveryTag, - queue.getMessageCount()); - - - } - }; + final GetDeliveryMethod getDeliveryMethod = + new GetDeliveryMethod(singleMessageCredit, session, channel, queue); final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod() { @@ -167,7 +151,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB } else { - target = ConsumerTarget_0_8.createNoAckTarget(channel, + target = ConsumerTarget_0_8.createGetNoAckTarget(channel, AMQShortString.EMPTY_STRING, null, singleMessageCredit, getDeliveryMethod, getRecordMethod); } @@ -175,10 +159,48 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB Consumer sub = queue.addConsumer(target, null, AMQMessage.class, "", options); sub.flush(); sub.close(); - return(!singleMessageCredit.hasCredit()); + return(getDeliveryMethod.hasDeliveredMessage()); } + private static class GetDeliveryMethod implements ClientDeliveryMethod + { + + private final FlowCreditManager _singleMessageCredit; + private final AMQProtocolSession _session; + private final AMQChannel _channel; + private final AMQQueue _queue; + private boolean _deliveredMessage; + + public GetDeliveryMethod(final FlowCreditManager singleMessageCredit, + final AMQProtocolSession session, + final AMQChannel channel, final AMQQueue queue) + { + _singleMessageCredit = singleMessageCredit; + _session = session; + _channel = channel; + _queue = queue; + } + + @Override + public void deliverToClient(final Consumer sub, final ServerMessage message, + final InstanceProperties props, final long deliveryTag) throws AMQException + { + _singleMessageCredit.useCreditForMessage(message.getSize()); + _session.getProtocolOutputConverter().writeGetOk(message, + props, + _channel.getChannelId(), + deliveryTag, + _queue.getMessageCount()); + + _deliveredMessage = true; + } + + public boolean hasDeliveredMessage() + { + return _deliveredMessage; + } + } } |