summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-02-10 08:58:38 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-02-10 08:58:38 +0000
commit8cc7eac4f1c528b27081684201f880661eb65e85 (patch)
treecf836f463af2804f9884347d16759f9912cb69dd
parent4f64702bcd4da0ce73622ec807d99cc3f50f309b (diff)
downloadqpid-python-8cc7eac4f1c528b27081684201f880661eb65e85.tar.gz
QPID-5504 : fixed implementation of 0-8 GET when using NoAck
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1566531 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java23
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java62
2 files changed, 59 insertions, 26 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index 47700f812f..cd71db15cd 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -31,6 +31,7 @@ import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.v0_8.handler.BasicGetMethodHandler;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
@@ -80,6 +81,16 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
return new BrowserConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
}
+ public static ConsumerTarget_0_8 createGetNoAckTarget(final AMQChannel channel,
+ final AMQShortString consumerTag,
+ final FieldTable filters,
+ final FlowCreditManager creditManager,
+ final ClientDeliveryMethod deliveryMethod,
+ final RecordDeliveryMethod recordMethod) throws AMQException
+ {
+ return new GetNoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
+ }
+
static final class BrowserConsumer extends ConsumerTarget_0_8
{
public BrowserConsumer(AMQChannel channel,
@@ -132,10 +143,10 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
}
public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel,
- AMQShortString consumerTag, FieldTable filters,
- FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod) throws AMQException
+ AMQShortString consumerTag, FieldTable filters,
+ FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod) throws AMQException
{
return new NoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
}
@@ -223,9 +234,9 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
*/
public static final class GetNoAckConsumer extends NoAckConsumer
{
- public GetNoAckConsumer(AMQChannel channel, AMQProtocolSession protocolSession,
+ public GetNoAckConsumer(AMQChannel channel,
AMQShortString consumerTag, FieldTable filters,
- boolean noLocal, FlowCreditManager creditManager,
+ FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
RecordDeliveryMethod recordMethod)
throws AMQException
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
index d4bd486a99..b1d2fa5088 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
@@ -128,24 +128,8 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L);
- final ClientDeliveryMethod getDeliveryMethod = new ClientDeliveryMethod()
- {
-
- @Override
- public void deliverToClient(final Consumer sub, final ServerMessage message, final
- InstanceProperties props, final long deliveryTag)
- throws AMQException
- {
- singleMessageCredit.useCreditForMessage(message.getSize());
- session.getProtocolOutputConverter().writeGetOk(message,
- props,
- channel.getChannelId(),
- deliveryTag,
- queue.getMessageCount());
-
-
- }
- };
+ final GetDeliveryMethod getDeliveryMethod =
+ new GetDeliveryMethod(singleMessageCredit, session, channel, queue);
final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod()
{
@@ -167,7 +151,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
}
else
{
- target = ConsumerTarget_0_8.createNoAckTarget(channel,
+ target = ConsumerTarget_0_8.createGetNoAckTarget(channel,
AMQShortString.EMPTY_STRING, null,
singleMessageCredit, getDeliveryMethod, getRecordMethod);
}
@@ -175,10 +159,48 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
Consumer sub = queue.addConsumer(target, null, AMQMessage.class, "", options);
sub.flush();
sub.close();
- return(!singleMessageCredit.hasCredit());
+ return(getDeliveryMethod.hasDeliveredMessage());
}
+ private static class GetDeliveryMethod implements ClientDeliveryMethod
+ {
+
+ private final FlowCreditManager _singleMessageCredit;
+ private final AMQProtocolSession _session;
+ private final AMQChannel _channel;
+ private final AMQQueue _queue;
+ private boolean _deliveredMessage;
+
+ public GetDeliveryMethod(final FlowCreditManager singleMessageCredit,
+ final AMQProtocolSession session,
+ final AMQChannel channel, final AMQQueue queue)
+ {
+ _singleMessageCredit = singleMessageCredit;
+ _session = session;
+ _channel = channel;
+ _queue = queue;
+ }
+
+ @Override
+ public void deliverToClient(final Consumer sub, final ServerMessage message,
+ final InstanceProperties props, final long deliveryTag) throws AMQException
+ {
+ _singleMessageCredit.useCreditForMessage(message.getSize());
+ _session.getProtocolOutputConverter().writeGetOk(message,
+ props,
+ _channel.getChannelId(),
+ deliveryTag,
+ _queue.getMessageCount());
+
+ _deliveredMessage = true;
+ }
+
+ public boolean hasDeliveredMessage()
+ {
+ return _deliveredMessage;
+ }
+ }
}