summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java44
1 files changed, 35 insertions, 9 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 3b807591b0..b1975338b7 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.client;
+import org.apache.qpid.AMQInternalException;
+import org.apache.qpid.filter.JMSSelectorFilter;
+import org.apache.qpid.filter.MessageFilter;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.*;
@@ -31,6 +34,7 @@ import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
@@ -52,7 +56,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
/** The connection being used by this consumer */
protected final AMQConnection _connection;
- protected final String _messageSelector;
+ protected final MessageFilter _messageSelectorFilter;
private final boolean _noLocal;
@@ -138,7 +142,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
*/
private final boolean _autoClose;
- private final boolean _noConsume;
+ private final boolean _browseOnly;
private List<StackTraceElement> _closedStack = null;
@@ -147,11 +151,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
AMQSession session, AMQProtocolHandler protocolHandler,
FieldTable arguments, int prefetchHigh, int prefetchLow,
- boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
+ boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException
{
_channelId = channelId;
_connection = connection;
- _messageSelector = messageSelector;
_noLocal = noLocal;
_destination = destination;
_messageFactory = messageFactory;
@@ -164,10 +167,28 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
_synchronousQueue = new LinkedBlockingQueue();
_autoClose = autoClose;
- _noConsume = noConsume;
+ _browseOnly = browseOnly;
+
+ try
+ {
+ if (messageSelector == null || "".equals(messageSelector.trim()))
+ {
+ _messageSelectorFilter = null;
+ }
+ else
+ {
+ _messageSelectorFilter = new JMSSelectorFilter(messageSelector);
+ }
+ }
+ catch (final AMQInternalException ie)
+ {
+ InvalidSelectorException ise = new InvalidSelectorException("cannot create consumer because of selector issue");
+ ise.setLinkedException(ie);
+ throw ise;
+ }
// Force queue browsers not to use acknowledge modes.
- if (_noConsume)
+ if (_browseOnly)
{
_acknowledgeMode = Session.NO_ACKNOWLEDGE;
}
@@ -186,7 +207,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
{
checkPreConditions();
- return _messageSelector;
+ return _messageSelectorFilter == null ? null :_messageSelectorFilter.getSelector();
}
public MessageListener getMessageListener() throws JMSException
@@ -345,6 +366,11 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
return _receiving.get();
}
+ public MessageFilter getMessageSelectorFilter()
+ {
+ return _messageSelectorFilter;
+ }
+
public Message receive() throws JMSException
{
return receive(0);
@@ -874,9 +900,9 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
return _autoClose;
}
- public boolean isNoConsume()
+ public boolean isBrowseOnly()
{
- return _noConsume;
+ return _browseOnly;
}
public void rollback()