summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java125
1 files changed, 70 insertions, 55 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 7c8ccf4cf9..71780f5714 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -26,17 +26,14 @@ import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQInternalException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.transport.*;
-import org.apache.qpid.filter.MessageFilter;
-import org.apache.qpid.filter.JMSSelectorFilter;
import org.apache.qpid.jms.Session;
-import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
+
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -52,11 +49,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
protected final Logger _logger = LoggerFactory.getLogger(getClass());
/**
- * The message selector filter associated with this consumer message selector
- */
- private MessageFilter _filter = null;
-
- /**
* The underlying QpidSession
*/
private AMQSession_0_10 _0_10session;
@@ -64,7 +56,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
/**
* Indicates whether this consumer receives pre-acquired messages
*/
- private boolean _preAcquire = true;
+ private final boolean _preAcquire;
/**
* Specify whether this consumer is performing a sync receive
@@ -72,44 +64,22 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
private String _consumerTagString;
- private long capacity = 0;
+ private final long _capacity;
protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
- AMQSession session, AMQProtocolHandler protocolHandler,
+ AMQSession<?,?> session, AMQProtocolHandler protocolHandler,
FieldTable arguments, int prefetchHigh, int prefetchLow,
- boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
+ boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose)
throws JMSException
{
super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler,
- arguments, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose);
+ arguments, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose);
_0_10session = (AMQSession_0_10) session;
- if (messageSelector != null && !messageSelector.equals(""))
- {
- try
- {
- _filter = new JMSSelectorFilter(messageSelector);
- }
- catch (AMQInternalException e)
- {
- throw new InvalidSelectorException("cannot create consumer because of selector issue");
- }
- if (destination instanceof AMQQueue)
- {
- _preAcquire = false;
- }
- }
-
- // Destination setting overrides connection defaults
- if (destination.getDestSyntax() == DestSyntax.ADDR &&
- destination.getLink().getConsumerCapacity() > 0)
- {
- capacity = destination.getLink().getConsumerCapacity();
- }
- else if (getSession().prefetch())
- {
- capacity = _0_10session.getAMQConnection().getMaxPrefetch();
- }
+
+ _preAcquire = evaluatePreAcquire(browseOnly, destination);
+
+ _capacity = evaluateCapacity(destination);
if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType())
{
@@ -123,7 +93,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
}
-
@Override public void setConsumerTag(int consumerTag)
{
super.setConsumerTag(consumerTag);
@@ -149,7 +118,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
if (checkPreConditions(jmsMessage))
{
- if (isMessageListenerSet() && capacity == 0)
+ if (isMessageListenerSet() && _capacity == 0)
{
messageFlow();
}
@@ -160,7 +129,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
// if we are synchronously waiting for a message
// and messages are not pre-fetched we then need to request another one
- if(capacity == 0)
+ if(_capacity == 0)
{
messageFlow();
}
@@ -238,9 +207,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
// TODO Use a tag for fiding out if message filtering is done here or by the broker.
try
{
- if (_messageSelector != null && !_messageSelector.equals(""))
+ if (_messageSelectorFilter != null)
{
- messageOk = _filter.matches(message);
+ messageOk = _messageSelectorFilter.matches(message);
}
}
catch (Exception e)
@@ -275,11 +244,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
flushUnwantedMessage(message);
}
}
-
- // now we need to acquire this message if needed
- // this is the case of queue with a message selector set
- if (!_preAcquire && messageOk && !isNoConsume())
+ else if (!_preAcquire && !isBrowseOnly())
{
+ // now we need to acquire this message if needed
+ // this is the case of queue with a message selector set
if (_logger.isDebugEnabled())
{
_logger.debug("filterMessage - trying to acquire message");
@@ -368,7 +336,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
super.setMessageListener(messageListener);
try
{
- if (messageListener != null && capacity == 0)
+ if (messageListener != null && _capacity == 0)
{
messageFlow();
}
@@ -408,11 +376,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
*/
public Object getMessageFromQueue(long l) throws InterruptedException
{
- if (capacity == 0)
+ if (_capacity == 0)
{
_syncReceive.set(true);
}
- if (_0_10session.isStarted() && capacity == 0 && _synchronousQueue.isEmpty())
+ if (_0_10session.isStarted() && _capacity == 0 && _synchronousQueue.isEmpty())
{
messageFlow();
}
@@ -427,18 +395,18 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
(getConsumerTagString(), MessageCreditUnit.BYTE,
0xFFFFFFFF, Option.UNRELIABLE);
- if (capacity > 0)
+ if (_capacity > 0)
{
_0_10session.getQpidSession().messageFlow
(getConsumerTagString(),
MessageCreditUnit.MESSAGE,
- capacity,
+ _capacity,
Option.UNRELIABLE);
}
_0_10session.syncDispatchQueue();
o = super.getMessageFromQueue(-1);
}
- if (capacity == 0)
+ if (_capacity == 0)
{
_syncReceive.set(false);
}
@@ -536,4 +504,51 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
}
}
+
+ long getCapacity()
+ {
+ return _capacity;
+ }
+
+ boolean isPreAcquire()
+ {
+ return _preAcquire;
+ }
+
+ private boolean evaluatePreAcquire(boolean browseOnly, AMQDestination destination)
+ {
+ boolean preAcquire;
+ if (browseOnly)
+ {
+ preAcquire = false;
+ }
+ else
+ {
+ boolean isQueue = (destination instanceof AMQQueue || getDestination().getAddressType() == AMQDestination.QUEUE_TYPE);
+ if (isQueue && getMessageSelectorFilter() != null)
+ {
+ preAcquire = false;
+ }
+ else
+ {
+ preAcquire = true;
+ }
+ }
+ return preAcquire;
+ }
+
+ private long evaluateCapacity(AMQDestination destination)
+ {
+ long capacity = 0;
+ if (destination.getLink() != null && destination.getLink().getConsumerCapacity() > 0)
+ {
+ capacity = destination.getLink().getConsumerCapacity();
+ }
+ else if (getSession().prefetch())
+ {
+ capacity = _0_10session.getAMQConnection().getMaxPrefetch();
+ }
+ return capacity;
+ }
+
}