diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 13 |
1 files changed, 7 insertions, 6 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index d34290e007..e8508a62bc 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -89,6 +89,7 @@ import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.filter.MessageFilter; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; @@ -894,7 +895,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic C consumer = _consumers.get(consumerTag); if (consumer != null) { - if (!consumer.isNoConsume()) // Normal Consumer + if (!consumer.isBrowseOnly()) // Normal Consumer { // Clean the Maps up first // Flush any pending messages for this consumerTag @@ -2572,7 +2573,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * @param queueName */ private void consumeFromQueue(C consumer, AMQShortString queueName, - AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException + AMQProtocolHandler protocolHandler, boolean nowait, MessageFilter messageSelector) throws AMQException, FailoverException { int tagId = _nextTag++; @@ -2600,7 +2601,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } public abstract void sendConsume(C consumer, AMQShortString queueName, - AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException; + AMQProtocolHandler protocolHandler, boolean nowait, MessageFilter messageSelector, int tag) throws AMQException, FailoverException; private P createProducerImpl(final Destination destination, final boolean mandatory, final boolean immediate) throws JMSException @@ -2925,7 +2926,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic try { - consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer._messageSelector); + consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelectorFilter()); } catch (FailoverException e) { @@ -3210,7 +3211,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic for (C consumer : _consumers.values()) { - if (!consumer.isNoConsume()) + if (!consumer.isBrowseOnly()) { consumer.rollback(); } @@ -3397,7 +3398,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } else { - if (consumer.isNoConsume()) + if (consumer.isBrowseOnly()) { _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "[" |