diff options
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java | 36 |
1 files changed, 27 insertions, 9 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 58b84910d4..bfdaa66618 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -298,12 +298,24 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By // the current message received is not good, so we need to get a message. if (getMessageListener() == null) { + int oldval = _messageCounter.intValue(); _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,1); _0_10session.getQpidSession().messageFlush(getConsumerTag().toString()); _0_10session.getQpidSession().sync(); _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); + if( _messageCounter.intValue() <= oldval ) + { + // we haven't received a message so tell the receiver to return null + _synchronousQueue.add(new NullTocken()); + } + else + { + _messageCounter.decrementAndGet(); + } } + // we now need to check if we have received a message + } catch(Exception e) { @@ -378,11 +390,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By return result; } - void preDeliver(AbstractJMSMessage msg) - { - _messageCounter.decrementAndGet(); - super.preDeliver(msg); - } public void setMessageListener(final MessageListener messageListener) throws JMSException { @@ -443,12 +450,23 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By { o = _synchronousQueue.take(); } - else - { - System.out.println("null"); - } } } + if( o instanceof NullTocken ) + { + o = null; + } return o; } + + protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException + { + _messageCounter.decrementAndGet(); + super.preApplicationProcessing(jmsMsg); + } + + private class NullTocken + { + + } }
\ No newline at end of file |