diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java | 29 |
1 files changed, 12 insertions, 17 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 47da59724c..3c24c67f9b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -66,19 +66,13 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM private boolean _preAcquire = true; /** - * Indicate whether this consumer is started. - */ - private boolean _isStarted = false; - - /** * Specify whether this consumer is performing a sync receive */ private final AtomicBoolean _syncReceive = new AtomicBoolean(false); private String _consumerTagString; private long capacity = 0; - - //--- constructor + protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, AMQProtocolHandler protocolHandler, @@ -104,7 +98,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM _preAcquire = false; } } - _isStarted = connection.started(); // Destination setting overrides connection defaults if (destination.getDestSyntax() == DestSyntax.ADDR && @@ -172,8 +165,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM } } - //----- overwritten methods - /** * This method is invoked when this consumer is stopped. * It tells the broker to stop delivering messages to this consumer. @@ -203,11 +194,18 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM super.notifyMessage(messageFrame); } - @Override protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException + @Override + protected void preDeliver(AbstractJMSMessage jmsMsg) { - super.preApplicationProcessing(jmsMsg); - if (!_session.getTransacted() && _session.getAcknowledgeMode() != org.apache.qpid.jms.Session.CLIENT_ACKNOWLEDGE) + super.preDeliver(jmsMsg); + + if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE) { + //For 0-10 we need to ensure that all messages are indicated processed in some way to + //ensure their AMQP command-id is marked completed, and so we must send a completion + //even for no-ack messages even though there isnt actually an 'acknowledgement' occurring. + //Add message to the unacked message list to ensure we dont lose record of it before + //sending a completion of some sort. _session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag()); } } @@ -219,7 +217,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM return _messageFactory.createMessage(msg.getMessageTransfer()); } - // private methods /** * Check whether a message can be delivered to this consumer. * @@ -457,10 +454,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM } if (_acknowledgeMode == org.apache.qpid.jms.Session.AUTO_ACKNOWLEDGE && - !_session.isInRecovery() && - _session.getAMQConnection().getSyncAck()) + !_session.isInRecovery() && _session.getAMQConnection().getSyncAck()) { - ((AMQSession_0_10) getSession()).flushAcknowledgments(); ((AMQSession_0_10) getSession()).getQpidSession().sync(); } } |