diff options
author | Arnaud Simon <arnaudsimon@apache.org> | 2007-10-15 17:21:44 +0000 |
---|---|---|
committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-10-15 17:21:44 +0000 |
commit | a0f7d70a077d0501d2698abf23363b17f5f9c017 (patch) | |
tree | ed9b7acf0839d8548ca5407d258d9f47d83fe388 | |
parent | 70860aeefd38dd0a16bdac23fd3733338a760e8b (diff) | |
download | qpid-python-a0f7d70a077d0501d2698abf23363b17f5f9c017.tar.gz |
increased number of runs
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@584826 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java | 2 | ||||
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java | 19 |
2 files changed, 18 insertions, 3 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index fa85afc6e8..84bad2e487 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -266,7 +266,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me } } - private void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException + protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException { if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index c273205b2d..335fd2252d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -40,6 +40,7 @@ import javax.jms.MessageListener; import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; /** * This is a 0.10 message consumer. @@ -48,6 +49,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By implements org.apache.qpidity.nclient.util.MessageListener { /** + * A counter for keeping the number of available messages for this consumer + */ + private final AtomicLong _messageCounter = new AtomicLong(0); + /** * This class logger */ protected final Logger _logger = LoggerFactory.getLogger(getClass()); @@ -78,7 +83,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler, rawSelectorFieldTable, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose); _0_10session = (AMQSession_0_10) session; - if (messageSelector != null && messageSelector != "") + if (messageSelector != null && ! messageSelector.equals("") ) { try { @@ -161,6 +166,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By newMessage.setReplyToURL(replyToUrl); } newMessage.setContentHeader(headers); + // increase the counter of messages + _messageCounter.incrementAndGet(); getSession().messageReceived(newMessage); // else ignore this message } @@ -348,6 +355,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By return result; } + protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException + { + _messageCounter.decrementAndGet(); + super.preApplicationProcessing(jmsMsg); + } public void setMessageListener(final MessageListener messageListener) throws JMSException { @@ -393,7 +405,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By _0_10session.getQpidSession().messageFlush(getConsumerTag().toString()); _0_10session.getQpidSession().sync(); _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); - o = _synchronousQueue.poll(); + if( _messageCounter.get() > 0 ) + { + o = _synchronousQueue.take(); + } } } else |