diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java | 102 |
1 files changed, 40 insertions, 62 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 3c24c67f9b..5fba351d8a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -150,13 +150,20 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { if (isMessageListenerSet() && capacity == 0) { - _0_10session.getQpidSession().messageFlow(getConsumerTagString(), - MessageCreditUnit.MESSAGE, 1, - Option.UNRELIABLE); + messageFlow(); } _logger.debug("messageOk, trying to notify"); super.notifyMessage(jmsMessage); } + else + { + // if we are synchronously waiting for a message + // and messages are not pre-fetched we then need to request another one + if(capacity == 0) + { + messageFlow(); + } + } } catch (AMQException e) { @@ -245,6 +252,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM _logger.debug("messageOk " + messageOk); _logger.debug("_preAcquire " + _preAcquire); } + if (!messageOk) { if (_preAcquire) @@ -261,19 +269,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { if (_logger.isDebugEnabled()) { - _logger.debug("Message not OK, releasing"); + _logger.debug("filterMessage - not ack'ing messaage as not aquired"); } - releaseMessage(message); - } - // if we are syncrhonously waiting for a message - // and messages are not prefetched we then need to request another one - if(capacity == 0) - { - _0_10session.getQpidSession().messageFlow(getConsumerTagString(), - MessageCreditUnit.MESSAGE, 1, - Option.UNRELIABLE); } } + // now we need to acquire this message if needed // this is the case of queue with a message selector set if (!_preAcquire && messageOk && !isNoConsume()) @@ -285,6 +285,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM messageOk = acquireMessage(message); _logger.debug("filterMessage - message acquire status : " + messageOk); } + return messageOk; } @@ -295,38 +296,18 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM * @param message The message to be acknowledged * @throws AMQException If the message cannot be acquired due to some internal error. */ - private void acknowledgeMessage(AbstractJMSMessage message) throws AMQException + private void acknowledgeMessage(final AbstractJMSMessage message) throws AMQException { - if (!_preAcquire) - { - RangeSet ranges = new RangeSet(); - ranges.add((int) message.getDeliveryTag()); - _0_10session.messageAcknowledge - (ranges, - _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); - - AMQException amqe = _0_10session.getCurrentException(); - if (amqe != null) - { - throw amqe; - } - } - } + final RangeSet ranges = new RangeSet(); + ranges.add((int) message.getDeliveryTag()); + _0_10session.messageAcknowledge + (ranges, + _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); - /** - * Release a message - * - * @param message The message to be released - * @throws AMQException If the message cannot be released due to some internal error. - */ - private void releaseMessage(AbstractJMSMessage message) throws AMQException - { - if (_preAcquire) + final AMQException amqe = _0_10session.getCurrentException(); + if (amqe != null) { - RangeSet ranges = new RangeSet(); - ranges.add((int) message.getDeliveryTag()); - _0_10session.getQpidSession().messageRelease(ranges); - _0_10session.sync(); + throw amqe; } } @@ -337,25 +318,28 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM * @return true if the message has been acquired, false otherwise. * @throws AMQException If the message cannot be acquired due to some internal error. */ - private boolean acquireMessage(AbstractJMSMessage message) throws AMQException + private boolean acquireMessage(final AbstractJMSMessage message) throws AMQException { boolean result = false; - if (!_preAcquire) - { - RangeSet ranges = new RangeSet(); - ranges.add((int) message.getDeliveryTag()); + final RangeSet ranges = new RangeSet(); + ranges.add((int) message.getDeliveryTag()); - Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get(); + final Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get(); - RangeSet acquired = acq.getTransfers(); - if (acquired != null && acquired.size() > 0) - { - result = true; - } + final RangeSet acquired = acq.getTransfers(); + if (acquired != null && acquired.size() > 0) + { + result = true; } return result; } + private void messageFlow() + { + _0_10session.getQpidSession().messageFlow(getConsumerTagString(), + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); + } public void setMessageListener(final MessageListener messageListener) throws JMSException { @@ -364,9 +348,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { if (messageListener != null && capacity == 0) { - _0_10session.getQpidSession().messageFlow(getConsumerTagString(), - MessageCreditUnit.MESSAGE, 1, - Option.UNRELIABLE); + messageFlow(); } if (messageListener != null && !_synchronousQueue.isEmpty()) { @@ -389,9 +371,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { if (_0_10session.isStarted() && _syncReceive.get()) { - _0_10session.getQpidSession().messageFlow - (getConsumerTagString(), MessageCreditUnit.MESSAGE, 1, - Option.UNRELIABLE); + messageFlow(); } } @@ -412,9 +392,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM } if (_0_10session.isStarted() && capacity == 0 && _synchronousQueue.isEmpty()) { - _0_10session.getQpidSession().messageFlow(getConsumerTagString(), - MessageCreditUnit.MESSAGE, 1, - Option.UNRELIABLE); + messageFlow(); } Object o = super.getMessageFromQueue(l); if (o == null && _0_10session.isStarted()) |