diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java | 44 |
1 files changed, 35 insertions, 9 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 3b807591b0..b1975338b7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.client; +import org.apache.qpid.AMQInternalException; +import org.apache.qpid.filter.JMSSelectorFilter; +import org.apache.qpid.filter.MessageFilter; import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.*; @@ -31,6 +34,7 @@ import org.apache.qpid.transport.TransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; @@ -52,7 +56,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa /** The connection being used by this consumer */ protected final AMQConnection _connection; - protected final String _messageSelector; + protected final MessageFilter _messageSelectorFilter; private final boolean _noLocal; @@ -138,7 +142,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa */ private final boolean _autoClose; - private final boolean _noConsume; + private final boolean _browseOnly; private List<StackTraceElement> _closedStack = null; @@ -147,11 +151,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, AMQProtocolHandler protocolHandler, FieldTable arguments, int prefetchHigh, int prefetchLow, - boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) + boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException { _channelId = channelId; _connection = connection; - _messageSelector = messageSelector; _noLocal = noLocal; _destination = destination; _messageFactory = messageFactory; @@ -164,10 +167,28 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa _synchronousQueue = new LinkedBlockingQueue(); _autoClose = autoClose; - _noConsume = noConsume; + _browseOnly = browseOnly; + + try + { + if (messageSelector == null || "".equals(messageSelector.trim())) + { + _messageSelectorFilter = null; + } + else + { + _messageSelectorFilter = new JMSSelectorFilter(messageSelector); + } + } + catch (final AMQInternalException ie) + { + InvalidSelectorException ise = new InvalidSelectorException("cannot create consumer because of selector issue"); + ise.setLinkedException(ie); + throw ise; + } // Force queue browsers not to use acknowledge modes. - if (_noConsume) + if (_browseOnly) { _acknowledgeMode = Session.NO_ACKNOWLEDGE; } @@ -186,7 +207,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa { checkPreConditions(); - return _messageSelector; + return _messageSelectorFilter == null ? null :_messageSelectorFilter.getSelector(); } public MessageListener getMessageListener() throws JMSException @@ -345,6 +366,11 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa return _receiving.get(); } + public MessageFilter getMessageSelectorFilter() + { + return _messageSelectorFilter; + } + public Message receive() throws JMSException { return receive(0); @@ -874,9 +900,9 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa return _autoClose; } - public boolean isNoConsume() + public boolean isBrowseOnly() { - return _noConsume; + return _browseOnly; } public void rollback() |