summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2007-10-15 17:21:44 +0000
committerArnaud Simon <arnaudsimon@apache.org>2007-10-15 17:21:44 +0000
commita0f7d70a077d0501d2698abf23363b17f5f9c017 (patch)
treeed9b7acf0839d8548ca5407d258d9f47d83fe388
parent70860aeefd38dd0a16bdac23fd3733338a760e8b (diff)
downloadqpid-python-a0f7d70a077d0501d2698abf23363b17f5f9c017.tar.gz
increased number of runs
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@584826 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java19
2 files changed, 18 insertions, 3 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index fa85afc6e8..84bad2e487 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -266,7 +266,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
}
}
- private void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
+ protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
{
if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index c273205b2d..335fd2252d 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -40,6 +40,7 @@ import javax.jms.MessageListener;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
/**
* This is a 0.10 message consumer.
@@ -48,6 +49,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
implements org.apache.qpidity.nclient.util.MessageListener
{
/**
+ * A counter for keeping the number of available messages for this consumer
+ */
+ private final AtomicLong _messageCounter = new AtomicLong(0);
+ /**
* This class logger
*/
protected final Logger _logger = LoggerFactory.getLogger(getClass());
@@ -78,7 +83,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler,
rawSelectorFieldTable, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose);
_0_10session = (AMQSession_0_10) session;
- if (messageSelector != null && messageSelector != "")
+ if (messageSelector != null && ! messageSelector.equals("") )
{
try
{
@@ -161,6 +166,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
newMessage.setReplyToURL(replyToUrl);
}
newMessage.setContentHeader(headers);
+ // increase the counter of messages
+ _messageCounter.incrementAndGet();
getSession().messageReceived(newMessage);
// else ignore this message
}
@@ -348,6 +355,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
return result;
}
+ protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
+ {
+ _messageCounter.decrementAndGet();
+ super.preApplicationProcessing(jmsMsg);
+ }
public void setMessageListener(final MessageListener messageListener) throws JMSException
{
@@ -393,7 +405,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
_0_10session.getQpidSession().messageFlush(getConsumerTag().toString());
_0_10session.getQpidSession().sync();
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
- o = _synchronousQueue.poll();
+ if( _messageCounter.get() > 0 )
+ {
+ o = _synchronousQueue.take();
+ }
}
}
else