diff options
author | Arnaud Simon <arnaudsimon@apache.org> | 2007-10-15 17:21:44 +0000 |
---|---|---|
committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-10-15 17:21:44 +0000 |
commit | 172fa169d687be4766a6864cde54edb6b4901fbd (patch) | |
tree | e524965c959c1f5c56ef6b4373deb057360378db | |
parent | 44755686b6aeea96a3a5fbace84ddd62c9d8bbb2 (diff) | |
download | qpid-python-172fa169d687be4766a6864cde54edb6b4901fbd.tar.gz |
increased number of runs
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@584826 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java | 2 | ||||
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java | 19 |
2 files changed, 18 insertions, 3 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index fa85afc6e8..84bad2e487 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -266,7 +266,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me } } - private void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException + protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException { if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index c273205b2d..335fd2252d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -40,6 +40,7 @@ import javax.jms.MessageListener; import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; /** * This is a 0.10 message consumer. @@ -48,6 +49,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By implements org.apache.qpidity.nclient.util.MessageListener { /** + * A counter for keeping the number of available messages for this consumer + */ + private final AtomicLong _messageCounter = new AtomicLong(0); + /** * This class logger */ protected final Logger _logger = LoggerFactory.getLogger(getClass()); @@ -78,7 +83,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler, rawSelectorFieldTable, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose); _0_10session = (AMQSession_0_10) session; - if (messageSelector != null && messageSelector != "") + if (messageSelector != null && ! messageSelector.equals("") ) { try { @@ -161,6 +166,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By newMessage.setReplyToURL(replyToUrl); } newMessage.setContentHeader(headers); + // increase the counter of messages + _messageCounter.incrementAndGet(); getSession().messageReceived(newMessage); // else ignore this message } @@ -348,6 +355,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By return result; } + protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException + { + _messageCounter.decrementAndGet(); + super.preApplicationProcessing(jmsMsg); + } public void setMessageListener(final MessageListener messageListener) throws JMSException { @@ -393,7 +405,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By _0_10session.getQpidSession().messageFlush(getConsumerTag().toString()); _0_10session.getQpidSession().sync(); _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); - o = _synchronousQueue.poll(); + if( _messageCounter.get() > 0 ) + { + o = _synchronousQueue.take(); + } } } else |