diff options
author | Arnaud Simon <arnaudsimon@apache.org> | 2007-10-17 14:09:46 +0000 |
---|---|---|
committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-10-17 14:09:46 +0000 |
commit | 7b14c847e3de611d73a41d636c307c7d940e4522 (patch) | |
tree | 1b6f567637b4eb21cfb07c669bc36180816496d0 | |
parent | 2b382c2ab784f310d0ed36825668a6368f22a668 (diff) | |
download | qpid-python-7b14c847e3de611d73a41d636c307c7d940e4522.tar.gz |
Cahnged flow control for message selector
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@585514 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java | 36 |
1 files changed, 27 insertions, 9 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 58b84910d4..bfdaa66618 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -298,12 +298,24 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By // the current message received is not good, so we need to get a message. if (getMessageListener() == null) { + int oldval = _messageCounter.intValue(); _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,1); _0_10session.getQpidSession().messageFlush(getConsumerTag().toString()); _0_10session.getQpidSession().sync(); _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); + if( _messageCounter.intValue() <= oldval ) + { + // we haven't received a message so tell the receiver to return null + _synchronousQueue.add(new NullTocken()); + } + else + { + _messageCounter.decrementAndGet(); + } } + // we now need to check if we have received a message + } catch(Exception e) { @@ -378,11 +390,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By return result; } - void preDeliver(AbstractJMSMessage msg) - { - _messageCounter.decrementAndGet(); - super.preDeliver(msg); - } public void setMessageListener(final MessageListener messageListener) throws JMSException { @@ -443,12 +450,23 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By { o = _synchronousQueue.take(); } - else - { - System.out.println("null"); - } } } + if( o instanceof NullTocken ) + { + o = null; + } return o; } + + protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException + { + _messageCounter.decrementAndGet(); + super.preApplicationProcessing(jmsMsg); + } + + private class NullTocken + { + + } }
\ No newline at end of file |