summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java36
1 files changed, 27 insertions, 9 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 58b84910d4..bfdaa66618 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -298,12 +298,24 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
// the current message received is not good, so we need to get a message.
if (getMessageListener() == null)
{
+ int oldval = _messageCounter.intValue();
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,1);
_0_10session.getQpidSession().messageFlush(getConsumerTag().toString());
_0_10session.getQpidSession().sync();
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
+ if( _messageCounter.intValue() <= oldval )
+ {
+ // we haven't received a message so tell the receiver to return null
+ _synchronousQueue.add(new NullTocken());
+ }
+ else
+ {
+ _messageCounter.decrementAndGet();
+ }
}
+ // we now need to check if we have received a message
+
}
catch(Exception e)
{
@@ -378,11 +390,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
return result;
}
- void preDeliver(AbstractJMSMessage msg)
- {
- _messageCounter.decrementAndGet();
- super.preDeliver(msg);
- }
public void setMessageListener(final MessageListener messageListener) throws JMSException
{
@@ -443,12 +450,23 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
{
o = _synchronousQueue.take();
}
- else
- {
- System.out.println("null");
- }
}
}
+ if( o instanceof NullTocken )
+ {
+ o = null;
+ }
return o;
}
+
+ protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
+ {
+ _messageCounter.decrementAndGet();
+ super.preApplicationProcessing(jmsMsg);
+ }
+
+ private class NullTocken
+ {
+
+ }
} \ No newline at end of file