summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java13
1 files changed, 7 insertions, 6 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index d34290e007..e8508a62bc 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -89,6 +89,7 @@ import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.filter.MessageFilter;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
@@ -894,7 +895,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
C consumer = _consumers.get(consumerTag);
if (consumer != null)
{
- if (!consumer.isNoConsume()) // Normal Consumer
+ if (!consumer.isBrowseOnly()) // Normal Consumer
{
// Clean the Maps up first
// Flush any pending messages for this consumerTag
@@ -2572,7 +2573,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* @param queueName
*/
private void consumeFromQueue(C consumer, AMQShortString queueName,
- AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException
+ AMQProtocolHandler protocolHandler, boolean nowait, MessageFilter messageSelector) throws AMQException, FailoverException
{
int tagId = _nextTag++;
@@ -2600,7 +2601,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
public abstract void sendConsume(C consumer, AMQShortString queueName,
- AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException;
+ AMQProtocolHandler protocolHandler, boolean nowait, MessageFilter messageSelector, int tag) throws AMQException, FailoverException;
private P createProducerImpl(final Destination destination, final boolean mandatory, final boolean immediate)
throws JMSException
@@ -2925,7 +2926,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
try
{
- consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer._messageSelector);
+ consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelectorFilter());
}
catch (FailoverException e)
{
@@ -3210,7 +3211,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
for (C consumer : _consumers.values())
{
- if (!consumer.isNoConsume())
+ if (!consumer.isBrowseOnly())
{
consumer.rollback();
}
@@ -3397,7 +3398,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
else
{
- if (consumer.isNoConsume())
+ if (consumer.isBrowseOnly())
{
_dispatcherLogger.info("Received a message("
+ System.identityHashCode(message) + ")" + "["