summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2011-11-07 08:27:31 +0000
committerKeith Wall <kwall@apache.org>2011-11-07 08:27:31 +0000
commit36d4650990d99bdbc29609567e56d846839edc46 (patch)
tree24d5fb1c1e9bb5d336e0601b016cfdb0a198c472
parentf1f1b41c39982ab393b73a099a8e479ee6251bd2 (diff)
downloadqpid-python-36d4650990d99bdbc29609567e56d846839edc46.tar.gz
QPID-2848: refactored message consumer: pre-aquire, capacity decisions are moved into consumer, renamed field noConsume into browseOnly, cleaned up selector filter code.
Applied patch from Andrew MacBean <andymacbean@gmail.com>, Oleksandr Rudyy<orudyy@gmail.com> and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1198642 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java13
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java82
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java9
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java44
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java125
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java19
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java14
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java4
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/filter/JMSSelectorFilterTest.java108
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java3
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java5
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java118
12 files changed, 372 insertions, 172 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index d34290e007..e8508a62bc 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -89,6 +89,7 @@ import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.filter.MessageFilter;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
@@ -894,7 +895,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
C consumer = _consumers.get(consumerTag);
if (consumer != null)
{
- if (!consumer.isNoConsume()) // Normal Consumer
+ if (!consumer.isBrowseOnly()) // Normal Consumer
{
// Clean the Maps up first
// Flush any pending messages for this consumerTag
@@ -2572,7 +2573,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* @param queueName
*/
private void consumeFromQueue(C consumer, AMQShortString queueName,
- AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException
+ AMQProtocolHandler protocolHandler, boolean nowait, MessageFilter messageSelector) throws AMQException, FailoverException
{
int tagId = _nextTag++;
@@ -2600,7 +2601,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
public abstract void sendConsume(C consumer, AMQShortString queueName,
- AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException;
+ AMQProtocolHandler protocolHandler, boolean nowait, MessageFilter messageSelector, int tag) throws AMQException, FailoverException;
private P createProducerImpl(final Destination destination, final boolean mandatory, final boolean immediate)
throws JMSException
@@ -2925,7 +2926,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
try
{
- consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer._messageSelector);
+ consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelectorFilter());
}
catch (FailoverException e)
{
@@ -3210,7 +3211,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
for (C consumer : _consumers.values())
{
- if (!consumer.isNoConsume())
+ if (!consumer.isBrowseOnly())
{
consumer.rollback();
}
@@ -3397,7 +3398,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
else
{
- if (consumer.isNoConsume())
+ if (consumer.isBrowseOnly())
{
_dispatcherLogger.info("Received a message("
+ System.identityHashCode(message) + ")" + "["
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 826ca46cca..c26fe98568 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -53,6 +53,7 @@ import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
import org.apache.qpid.client.messaging.address.Node.QueueNode;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.filter.MessageFilter;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQConstant;
@@ -579,56 +580,30 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
* Registers the consumer with the broker
*/
public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
- boolean nowait, String messageSelector, int tag)
+ boolean nowait, MessageFilter messageSelector, int tag)
throws AMQException, FailoverException
{
- boolean preAcquire;
-
- long capacity = getCapacity(consumer.getDestination());
-
- try
- {
- boolean isTopic;
- Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments());
-
- if (consumer.getDestination().getDestSyntax() == AMQDestination.DestSyntax.BURL)
- {
- isTopic = consumer.getDestination() instanceof AMQTopic ||
- consumer.getDestination().getExchangeClass().equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS) ;
-
- preAcquire = isTopic || (!consumer.isNoConsume() &&
- (consumer.getMessageSelector() == null || consumer.getMessageSelector().equals("")));
- }
- else
- {
- isTopic = consumer.getDestination().getAddressType() == AMQDestination.TOPIC_TYPE;
-
- preAcquire = !consumer.isNoConsume() &&
- (isTopic || consumer.getMessageSelector() == null ||
- consumer.getMessageSelector().equals(""));
-
- arguments.putAll(
- (Map<? extends String, ? extends Object>) consumer.getDestination().getLink().getSubscription().getArgs());
- }
-
- boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE;
-
- if (consumer.getDestination().getLink() != null)
- {
- acceptModeNone = consumer.getDestination().getLink().getReliability() == Link.Reliability.UNRELIABLE;
- }
-
- getQpidSession().messageSubscribe
- (queueName.toString(), String.valueOf(tag),
- acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
- preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments,
- consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
- }
- catch (JMSException e)
+ boolean preAcquire = consumer.isPreAcquire();
+
+ AMQDestination destination = consumer.getDestination();
+ long capacity = consumer.getCapacity();
+
+ Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments());
+
+ Link link = destination.getLink();
+ if (link != null && link.getSubscription() != null && link.getSubscription().getArgs() != null)
{
- throw new AMQException(AMQConstant.INTERNAL_ERROR, "problem when registering consumer", e);
+ arguments.putAll((Map<? extends String, ? extends Object>) link.getSubscription().getArgs());
}
+ boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE;
+
+ getQpidSession().messageSubscribe
+ (queueName.toString(), String.valueOf(tag),
+ acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
+ preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments,
+ consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
+
String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString();
if (capacity == 0)
@@ -657,21 +632,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
- private long getCapacity(AMQDestination destination)
- {
- long capacity = 0;
- if (destination.getDestSyntax() == DestSyntax.ADDR &&
- destination.getLink().getConsumerCapacity() > 0)
- {
- capacity = destination.getLink().getConsumerCapacity();
- }
- else if (prefetch())
- {
- capacity = getAMQConnection().getMaxPrefetch();
- }
- return capacity;
- }
-
/**
* Create an 0_10 message producer
*/
@@ -836,7 +796,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
//only set if msg list is null
try
{
- long capacity = getCapacity(consumer.getDestination());
+ long capacity = consumer.getCapacity();
if (capacity == 0)
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 369c8a6e9d..85fc857014 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -41,6 +41,7 @@ import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.filter.MessageFilter;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
@@ -333,13 +334,13 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
AMQShortString queueName,
AMQProtocolHandler protocolHandler,
boolean nowait,
- String messageSelector,
+ MessageFilter messageSelector,
int tag) throws AMQException, FailoverException
{
FieldTable arguments = FieldTableFactory.newFieldTable();
- if ((messageSelector != null) && !messageSelector.equals(""))
+ if (messageSelector != null)
{
- arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector);
+ arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector.getSelector());
}
if (consumer.isAutoClose())
@@ -347,7 +348,7 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
}
- if (consumer.isNoConsume())
+ if (consumer.isBrowseOnly())
{
arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 3b807591b0..b1975338b7 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.client;
+import org.apache.qpid.AMQInternalException;
+import org.apache.qpid.filter.JMSSelectorFilter;
+import org.apache.qpid.filter.MessageFilter;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.*;
@@ -31,6 +34,7 @@ import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
@@ -52,7 +56,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
/** The connection being used by this consumer */
protected final AMQConnection _connection;
- protected final String _messageSelector;
+ protected final MessageFilter _messageSelectorFilter;
private final boolean _noLocal;
@@ -138,7 +142,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
*/
private final boolean _autoClose;
- private final boolean _noConsume;
+ private final boolean _browseOnly;
private List<StackTraceElement> _closedStack = null;
@@ -147,11 +151,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
AMQSession session, AMQProtocolHandler protocolHandler,
FieldTable arguments, int prefetchHigh, int prefetchLow,
- boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
+ boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException
{
_channelId = channelId;
_connection = connection;
- _messageSelector = messageSelector;
_noLocal = noLocal;
_destination = destination;
_messageFactory = messageFactory;
@@ -164,10 +167,28 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
_synchronousQueue = new LinkedBlockingQueue();
_autoClose = autoClose;
- _noConsume = noConsume;
+ _browseOnly = browseOnly;
+
+ try
+ {
+ if (messageSelector == null || "".equals(messageSelector.trim()))
+ {
+ _messageSelectorFilter = null;
+ }
+ else
+ {
+ _messageSelectorFilter = new JMSSelectorFilter(messageSelector);
+ }
+ }
+ catch (final AMQInternalException ie)
+ {
+ InvalidSelectorException ise = new InvalidSelectorException("cannot create consumer because of selector issue");
+ ise.setLinkedException(ie);
+ throw ise;
+ }
// Force queue browsers not to use acknowledge modes.
- if (_noConsume)
+ if (_browseOnly)
{
_acknowledgeMode = Session.NO_ACKNOWLEDGE;
}
@@ -186,7 +207,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
{
checkPreConditions();
- return _messageSelector;
+ return _messageSelectorFilter == null ? null :_messageSelectorFilter.getSelector();
}
public MessageListener getMessageListener() throws JMSException
@@ -345,6 +366,11 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
return _receiving.get();
}
+ public MessageFilter getMessageSelectorFilter()
+ {
+ return _messageSelectorFilter;
+ }
+
public Message receive() throws JMSException
{
return receive(0);
@@ -874,9 +900,9 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
return _autoClose;
}
- public boolean isNoConsume()
+ public boolean isBrowseOnly()
{
- return _noConsume;
+ return _browseOnly;
}
public void rollback()
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 7c8ccf4cf9..71780f5714 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -26,17 +26,14 @@ import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQInternalException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.transport.*;
-import org.apache.qpid.filter.MessageFilter;
-import org.apache.qpid.filter.JMSSelectorFilter;
import org.apache.qpid.jms.Session;
-import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
+
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -52,11 +49,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
protected final Logger _logger = LoggerFactory.getLogger(getClass());
/**
- * The message selector filter associated with this consumer message selector
- */
- private MessageFilter _filter = null;
-
- /**
* The underlying QpidSession
*/
private AMQSession_0_10 _0_10session;
@@ -64,7 +56,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
/**
* Indicates whether this consumer receives pre-acquired messages
*/
- private boolean _preAcquire = true;
+ private final boolean _preAcquire;
/**
* Specify whether this consumer is performing a sync receive
@@ -72,44 +64,22 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
private String _consumerTagString;
- private long capacity = 0;
+ private final long _capacity;
protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
- AMQSession session, AMQProtocolHandler protocolHandler,
+ AMQSession<?,?> session, AMQProtocolHandler protocolHandler,
FieldTable arguments, int prefetchHigh, int prefetchLow,
- boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
+ boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose)
throws JMSException
{
super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler,
- arguments, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose);
+ arguments, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose);
_0_10session = (AMQSession_0_10) session;
- if (messageSelector != null && !messageSelector.equals(""))
- {
- try
- {
- _filter = new JMSSelectorFilter(messageSelector);
- }
- catch (AMQInternalException e)
- {
- throw new InvalidSelectorException("cannot create consumer because of selector issue");
- }
- if (destination instanceof AMQQueue)
- {
- _preAcquire = false;
- }
- }
-
- // Destination setting overrides connection defaults
- if (destination.getDestSyntax() == DestSyntax.ADDR &&
- destination.getLink().getConsumerCapacity() > 0)
- {
- capacity = destination.getLink().getConsumerCapacity();
- }
- else if (getSession().prefetch())
- {
- capacity = _0_10session.getAMQConnection().getMaxPrefetch();
- }
+
+ _preAcquire = evaluatePreAcquire(browseOnly, destination);
+
+ _capacity = evaluateCapacity(destination);
if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType())
{
@@ -123,7 +93,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
}
-
@Override public void setConsumerTag(int consumerTag)
{
super.setConsumerTag(consumerTag);
@@ -149,7 +118,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
if (checkPreConditions(jmsMessage))
{
- if (isMessageListenerSet() && capacity == 0)
+ if (isMessageListenerSet() && _capacity == 0)
{
messageFlow();
}
@@ -160,7 +129,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
// if we are synchronously waiting for a message
// and messages are not pre-fetched we then need to request another one
- if(capacity == 0)
+ if(_capacity == 0)
{
messageFlow();
}
@@ -238,9 +207,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
// TODO Use a tag for fiding out if message filtering is done here or by the broker.
try
{
- if (_messageSelector != null && !_messageSelector.equals(""))
+ if (_messageSelectorFilter != null)
{
- messageOk = _filter.matches(message);
+ messageOk = _messageSelectorFilter.matches(message);
}
}
catch (Exception e)
@@ -275,11 +244,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
flushUnwantedMessage(message);
}
}
-
- // now we need to acquire this message if needed
- // this is the case of queue with a message selector set
- if (!_preAcquire && messageOk && !isNoConsume())
+ else if (!_preAcquire && !isBrowseOnly())
{
+ // now we need to acquire this message if needed
+ // this is the case of queue with a message selector set
if (_logger.isDebugEnabled())
{
_logger.debug("filterMessage - trying to acquire message");
@@ -368,7 +336,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
super.setMessageListener(messageListener);
try
{
- if (messageListener != null && capacity == 0)
+ if (messageListener != null && _capacity == 0)
{
messageFlow();
}
@@ -408,11 +376,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
*/
public Object getMessageFromQueue(long l) throws InterruptedException
{
- if (capacity == 0)
+ if (_capacity == 0)
{
_syncReceive.set(true);
}
- if (_0_10session.isStarted() && capacity == 0 && _synchronousQueue.isEmpty())
+ if (_0_10session.isStarted() && _capacity == 0 && _synchronousQueue.isEmpty())
{
messageFlow();
}
@@ -427,18 +395,18 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
(getConsumerTagString(), MessageCreditUnit.BYTE,
0xFFFFFFFF, Option.UNRELIABLE);
- if (capacity > 0)
+ if (_capacity > 0)
{
_0_10session.getQpidSession().messageFlow
(getConsumerTagString(),
MessageCreditUnit.MESSAGE,
- capacity,
+ _capacity,
Option.UNRELIABLE);
}
_0_10session.syncDispatchQueue();
o = super.getMessageFromQueue(-1);
}
- if (capacity == 0)
+ if (_capacity == 0)
{
_syncReceive.set(false);
}
@@ -536,4 +504,51 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
}
}
+
+ long getCapacity()
+ {
+ return _capacity;
+ }
+
+ boolean isPreAcquire()
+ {
+ return _preAcquire;
+ }
+
+ private boolean evaluatePreAcquire(boolean browseOnly, AMQDestination destination)
+ {
+ boolean preAcquire;
+ if (browseOnly)
+ {
+ preAcquire = false;
+ }
+ else
+ {
+ boolean isQueue = (destination instanceof AMQQueue || getDestination().getAddressType() == AMQDestination.QUEUE_TYPE);
+ if (isQueue && getMessageSelectorFilter() != null)
+ {
+ preAcquire = false;
+ }
+ else
+ {
+ preAcquire = true;
+ }
+ }
+ return preAcquire;
+ }
+
+ private long evaluateCapacity(AMQDestination destination)
+ {
+ long capacity = 0;
+ if (destination.getLink() != null && destination.getLink().getConsumerCapacity() > 0)
+ {
+ capacity = destination.getLink().getConsumerCapacity();
+ }
+ else if (getSession().prefetch())
+ {
+ capacity = _0_10session.getAMQConnection().getMaxPrefetch();
+ }
+ return capacity;
+ }
+
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index 00acd5e866..cc061e35cb 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -20,16 +20,13 @@
*/
package org.apache.qpid.client;
-import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQInternalException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.filter.JMSSelectorFilter;
import org.apache.qpid.framing.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,23 +38,11 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
AMQProtocolHandler protocolHandler, FieldTable arguments, int prefetchHigh, int prefetchLow,
- boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) throws JMSException
+ boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException
{
super(channelId, connection, destination,messageSelector,noLocal,messageFactory,session,
protocolHandler, arguments, prefetchHigh, prefetchLow, exclusive,
- acknowledgeMode, noConsume, autoClose);
- try
- {
-
- if (messageSelector != null && messageSelector.length() > 0)
- {
- JMSSelectorFilter _filter = new JMSSelectorFilter(messageSelector);
- }
- }
- catch (AMQInternalException e)
- {
- throw new InvalidSelectorException("cannot create consumer because of selector issue");
- }
+ acknowledgeMode, browseOnly, autoClose);
}
void sendCancel() throws AMQException, FailoverException
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java b/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java
index 40718c6435..a1b4aff659 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java
@@ -26,16 +26,17 @@ import org.slf4j.LoggerFactory;
public class JMSSelectorFilter implements MessageFilter
{
- /**
- * this JMSSelectorFilter's logger
- */
private static final Logger _logger = LoggerFactory.getLogger(JMSSelectorFilter.class);
- private String _selector;
- private BooleanExpression _matcher;
+ private final String _selector;
+ private final BooleanExpression _matcher;
public JMSSelectorFilter(String selector) throws AMQInternalException
{
+ if (selector == null || "".equals(selector))
+ {
+ throw new IllegalArgumentException("Cannot create a JMSSelectorFilter with a null or empty selector string");
+ }
_selector = selector;
if (_logger.isDebugEnabled())
{
@@ -51,8 +52,7 @@ public class JMSSelectorFilter implements MessageFilter
boolean match = _matcher.matches(message);
if (_logger.isDebugEnabled())
{
- _logger.debug(message + " match(" + match + ") selector(" + System
- .identityHashCode(_selector) + "):" + _selector);
+ _logger.debug(message + " match(" + match + ") selector(" + _selector + "): " + _selector);
}
return match;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java b/qpid/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java
index 62e4a28c1e..ec0e8ea4c0 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java
@@ -17,11 +17,11 @@
*/
package org.apache.qpid.filter;
-import org.apache.qpid.AMQInternalException;
import org.apache.qpid.client.message.AbstractJMSMessage;
public interface MessageFilter
{
- boolean matches(AbstractJMSMessage message) throws AMQInternalException;
+ boolean matches(AbstractJMSMessage message);
+ String getSelector();
}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/filter/JMSSelectorFilterTest.java b/qpid/java/client/src/test/java/org/apache/qpid/filter/JMSSelectorFilterTest.java
new file mode 100644
index 0000000000..d4d8ea4350
--- /dev/null
+++ b/qpid/java/client/src/test/java/org/apache/qpid/filter/JMSSelectorFilterTest.java
@@ -0,0 +1,108 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.filter;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.AMQInternalException;
+import org.apache.qpid.client.message.JMSTextMessage;
+import org.apache.qpid.client.message.TestMessageHelper;
+
+public class JMSSelectorFilterTest extends TestCase
+{
+
+ public void testEmptySelectorFilter() throws Exception
+ {
+ try
+ {
+ new JMSSelectorFilter("");
+ fail("Should not be able to create a JMSSelectorFilter with an empty selector");
+ }
+ catch (IllegalArgumentException iae)
+ {
+ // pass
+ }
+ }
+
+ public void testNullSelectorFilter() throws Exception
+ {
+ try
+ {
+ new JMSSelectorFilter(null);
+ fail("Should not be able to create a JMSSelectorFilter with a null selector");
+ }
+ catch (IllegalArgumentException iae)
+ {
+ // pass
+ }
+ }
+
+ public void testInvalidSelectorFilter() throws Exception
+ {
+ try
+ {
+ new JMSSelectorFilter("$%^");
+ fail("Unparsable selector so expected AMQInternalException to be thrown");
+ }
+ catch (AMQInternalException amqie)
+ {
+ // pass
+ }
+ }
+
+ public void testSimpleSelectorFilter() throws Exception
+ {
+ MessageFilter simpleSelectorFilter = new JMSSelectorFilter("select=5");
+
+ assertNotNull("Filter object is null", simpleSelectorFilter);
+ assertNotNull("Selector string is null", simpleSelectorFilter.getSelector());
+ assertEquals("Unexpected selector", "select=5", simpleSelectorFilter.getSelector());
+ assertTrue("Filter object is invalid", simpleSelectorFilter != null);
+
+ final JMSTextMessage message = TestMessageHelper.newJMSTextMessage();
+
+ message.setIntProperty("select", 4);
+ assertFalse("Selector did match when not expected", simpleSelectorFilter.matches(message));
+ message.setIntProperty("select", 5);
+ assertTrue("Selector didnt match when expected", simpleSelectorFilter.matches(message));
+ message.setIntProperty("select", 6);
+ assertFalse("Selector did match when not expected", simpleSelectorFilter.matches(message));
+ }
+
+ public void testFailedMatchingFilter() throws Exception
+ {
+ MessageFilter simpleSelectorFilter = new JMSSelectorFilter("select>4");
+
+ assertNotNull("Filter object is null", simpleSelectorFilter);
+ assertNotNull("Selector string is null", simpleSelectorFilter.getSelector());
+ assertEquals("Unexpected selector", "select>4", simpleSelectorFilter.getSelector());
+ assertTrue("Filter object is invalid", simpleSelectorFilter != null);
+
+ final JMSTextMessage message = TestMessageHelper.newJMSTextMessage();
+
+ message.setStringProperty("select", "5");
+ assertFalse("Selector matched when not expected", simpleSelectorFilter.matches(message));
+ message.setStringProperty("select", "elephant");
+ assertFalse("Selector matched when not expected", simpleSelectorFilter.matches(message));
+ message.setBooleanProperty("select", false);
+ assertFalse("Selector matched when not expected", simpleSelectorFilter.matches(message));
+ }
+}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
index 6759b43387..b9d1476055 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
@@ -36,6 +36,7 @@ import org.apache.qpid.client.BasicMessageProducer_0_8;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.filter.MessageFilter;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -124,7 +125,7 @@ public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMe
return false;
}
- public void sendConsume(BasicMessageConsumer_0_8 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException
+ public void sendConsume(BasicMessageConsumer_0_8 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, MessageFilter messageSelector, int tag) throws AMQException, FailoverException
{
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
index d6caf05d33..4eb328f091 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
@@ -261,8 +261,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase
protected void checkOverlappingMultipleGetEnum(int expectedMessages, int browserEnumerationCount, String selector) throws JMSException
{
QueueBrowser queueBrowser = selector == null ?
- _clientSession.createBrowser(_queue) : _clientSession.createBrowser(_queue);
-// _clientSession.createBrowser(_queue) : _clientSession.createBrowser(_queue, selector);
+ _clientSession.createBrowser(_queue) : _clientSession.createBrowser(_queue, selector);
Enumeration[] msgs = new Enumeration[browserEnumerationCount];
int[] msgCount = new int[browserEnumerationCount];
@@ -347,7 +346,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase
protected void checkQueueDepthWithSelectors(int totalMessages, int clients) throws JMSException
{
- String selector = MESSAGE_ID_PROPERTY + " % " + clients;
+ String selector = MESSAGE_ID_PROPERTY + " % " + clients + " = 0" ;
checkOverlappingMultipleGetEnum(totalMessages / clients, clients, selector);
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
index 8c3c247e2b..b70b2f90e4 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
@@ -1,4 +1,3 @@
-package org.apache.qpid.test.client.destination;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,11 +18,13 @@ package org.apache.qpid.test.client.destination;
* under the License.
*
*/
-
+package org.apache.qpid.test.client.destination;
import java.util.Collections;
+import java.util.Enumeration;
import java.util.HashMap;
import java.util.Hashtable;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -34,6 +35,7 @@ import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
+import javax.jms.QueueBrowser;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
@@ -475,13 +477,13 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
{
prod.send(jmsSession.createTextMessage("msg" + i) );
}
-
- for (int i=0; i< 9; i++)
+ Message msg = null;
+ for (int i=0; i< 10; i++)
{
- cons.receive();
+ msg = cons.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Should have received " + i + " message", msg);
+ assertEquals("Unexpected message received", "msg" + i, ((TextMessage)msg).getText());
}
- Message msg = cons.receive(RECEIVE_TIMEOUT);
- assertNotNull("Should have received the 10th message",msg);
assertNull("Shouldn't have received the 11th message as capacity is 10",cons.receive(RECEIVE_TIMEOUT));
msg.acknowledge();
for (int i=11; i<16; i++)
@@ -1182,4 +1184,106 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
assertNotNull("The consumer on the queue bound to the alt-exchange should receive the message",cons.receive(1000));
cons.close();
}
+
+ public void testQueueBrowserWithSelectorAutoAcknowledgement() throws Exception
+ {
+ assertQueueBrowserWithSelector(Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void testQueueBrowserWithSelectorClientAcknowldgement() throws Exception
+ {
+ assertQueueBrowserWithSelector(Session.CLIENT_ACKNOWLEDGE);
+ }
+
+ public void testQueueBrowserWithSelectorTransactedSession() throws Exception
+ {
+ assertQueueBrowserWithSelector(Session.SESSION_TRANSACTED);
+ }
+
+ public void testConsumerWithSelectorAutoAcknowledgement() throws Exception
+ {
+ assertConsumerWithSelector(Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void testConsumerWithSelectorClientAcknowldgement() throws Exception
+ {
+ assertConsumerWithSelector(Session.CLIENT_ACKNOWLEDGE);
+ }
+
+ public void testConsumerWithSelectorTransactedSession() throws Exception
+ {
+ assertConsumerWithSelector(Session.SESSION_TRANSACTED);
+ }
+
+ private void assertQueueBrowserWithSelector(int acknowledgement) throws Exception
+ {
+ String queueAddress = "ADDR:" + getTestQueueName() + ";{create: always}";
+
+ boolean transacted = acknowledgement == Session.SESSION_TRANSACTED;
+ Session session = _connection.createSession(transacted, acknowledgement);
+
+ Queue queue = session.createQueue(queueAddress);
+
+ final int numberOfMessages = 10;
+ List<Message> sentMessages = sendMessage(session, queue, numberOfMessages);
+ assertNotNull("Messages were not sent", sentMessages);
+ assertEquals("Unexpected number of messages were sent", numberOfMessages, sentMessages.size());
+
+ QueueBrowser browser = session.createBrowser(queue, INDEX + "%2=0");
+ _connection.start();
+
+ Enumeration<Message> enumaration = browser.getEnumeration();
+
+ int counter = 0;
+ int expectedIndex = 0;
+ while (enumaration.hasMoreElements())
+ {
+ Message m = enumaration.nextElement();
+ assertNotNull("Expected not null message at step " + counter, m);
+ int messageIndex = m.getIntProperty(INDEX);
+ assertEquals("Unexpected index", expectedIndex, messageIndex);
+ expectedIndex += 2;
+ counter++;
+ }
+ assertEquals("Unexpected number of messsages received", 5, counter);
+ }
+
+ private void assertConsumerWithSelector(int acknowledgement) throws Exception
+ {
+ String queueAddress = "ADDR:" + getTestQueueName() + ";{create: always}";
+
+ boolean transacted = acknowledgement == Session.SESSION_TRANSACTED;
+ Session session = _connection.createSession(transacted, acknowledgement);
+
+ Queue queue = session.createQueue(queueAddress);
+
+ final int numberOfMessages = 10;
+ List<Message> sentMessages = sendMessage(session, queue, numberOfMessages);
+ assertNotNull("Messages were not sent", sentMessages);
+ assertEquals("Unexpected number of messages were sent", numberOfMessages, sentMessages.size());
+
+ MessageConsumer consumer = session.createConsumer(queue, INDEX + "%2=0");
+
+ int expectedIndex = 0;
+ for (int i = 0; i < 5; i++)
+ {
+ Message m = consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Expected not null message at step " + i, m);
+ int messageIndex = m.getIntProperty(INDEX);
+ assertEquals("Unexpected index", expectedIndex, messageIndex);
+ expectedIndex += 2;
+
+ if (transacted)
+ {
+ session.commit();
+ }
+ else if (acknowledgement == Session.CLIENT_ACKNOWLEDGE)
+ {
+ m.acknowledge();
+ }
+ }
+
+ Message m = consumer.receive(RECEIVE_TIMEOUT);
+ assertNull("Unexpected message received", m);
+ }
}