From 68a3db2e1f58d1bacffbe62885519fe0a123060f Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Mon, 7 Nov 2011 08:27:31 +0000 Subject: QPID-2848: refactored message consumer: pre-aquire, capacity decisions are moved into consumer, renamed field noConsume into browseOnly, cleaned up selector filter code. Applied patch from Andrew MacBean , Oleksandr Rudyy and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1198642 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 13 ++- .../org/apache/qpid/client/AMQSession_0_10.java | 82 ++++---------- .../org/apache/qpid/client/AMQSession_0_8.java | 9 +- .../apache/qpid/client/BasicMessageConsumer.java | 44 ++++++-- .../qpid/client/BasicMessageConsumer_0_10.java | 125 ++++++++++++--------- .../qpid/client/BasicMessageConsumer_0_8.java | 19 +--- .../org/apache/qpid/filter/JMSSelectorFilter.java | 14 +-- .../java/org/apache/qpid/filter/MessageFilter.java | 4 +- .../apache/qpid/filter/JMSSelectorFilterTest.java | 108 ++++++++++++++++++ .../qpid/test/unit/message/TestAMQSession.java | 3 +- 10 files changed, 259 insertions(+), 162 deletions(-) create mode 100644 java/client/src/test/java/org/apache/qpid/filter/JMSSelectorFilterTest.java (limited to 'java/client/src') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index d34290e007..e8508a62bc 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -89,6 +89,7 @@ import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.filter.MessageFilter; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; @@ -894,7 +895,7 @@ public abstract class AMQSession arguments = FieldTable.convertToMap(consumer.getArguments()); - - if (consumer.getDestination().getDestSyntax() == AMQDestination.DestSyntax.BURL) - { - isTopic = consumer.getDestination() instanceof AMQTopic || - consumer.getDestination().getExchangeClass().equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS) ; - - preAcquire = isTopic || (!consumer.isNoConsume() && - (consumer.getMessageSelector() == null || consumer.getMessageSelector().equals(""))); - } - else - { - isTopic = consumer.getDestination().getAddressType() == AMQDestination.TOPIC_TYPE; - - preAcquire = !consumer.isNoConsume() && - (isTopic || consumer.getMessageSelector() == null || - consumer.getMessageSelector().equals("")); - - arguments.putAll( - (Map) consumer.getDestination().getLink().getSubscription().getArgs()); - } - - boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE; - - if (consumer.getDestination().getLink() != null) - { - acceptModeNone = consumer.getDestination().getLink().getReliability() == Link.Reliability.UNRELIABLE; - } - - getQpidSession().messageSubscribe - (queueName.toString(), String.valueOf(tag), - acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT, - preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments, - consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE); - } - catch (JMSException e) + boolean preAcquire = consumer.isPreAcquire(); + + AMQDestination destination = consumer.getDestination(); + long capacity = consumer.getCapacity(); + + Map arguments = FieldTable.convertToMap(consumer.getArguments()); + + Link link = destination.getLink(); + if (link != null && link.getSubscription() != null && link.getSubscription().getArgs() != null) { - throw new AMQException(AMQConstant.INTERNAL_ERROR, "problem when registering consumer", e); + arguments.putAll((Map) link.getSubscription().getArgs()); } + boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE; + + getQpidSession().messageSubscribe + (queueName.toString(), String.valueOf(tag), + acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT, + preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments, + consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE); + String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString(); if (capacity == 0) @@ -657,21 +632,6 @@ public class AMQSession_0_10 extends AMQSession 0) - { - capacity = destination.getLink().getConsumerCapacity(); - } - else if (prefetch()) - { - capacity = getAMQConnection().getMaxPrefetch(); - } - return capacity; - } - /** * Create an 0_10 message producer */ @@ -836,7 +796,7 @@ public class AMQSession_0_10 extends AMQSession extends Closeable implements Messa /** The connection being used by this consumer */ protected final AMQConnection _connection; - protected final String _messageSelector; + protected final MessageFilter _messageSelectorFilter; private final boolean _noLocal; @@ -138,7 +142,7 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa */ private final boolean _autoClose; - private final boolean _noConsume; + private final boolean _browseOnly; private List _closedStack = null; @@ -147,11 +151,10 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, AMQProtocolHandler protocolHandler, FieldTable arguments, int prefetchHigh, int prefetchLow, - boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) + boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException { _channelId = channelId; _connection = connection; - _messageSelector = messageSelector; _noLocal = noLocal; _destination = destination; _messageFactory = messageFactory; @@ -164,10 +167,28 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa _synchronousQueue = new LinkedBlockingQueue(); _autoClose = autoClose; - _noConsume = noConsume; + _browseOnly = browseOnly; + + try + { + if (messageSelector == null || "".equals(messageSelector.trim())) + { + _messageSelectorFilter = null; + } + else + { + _messageSelectorFilter = new JMSSelectorFilter(messageSelector); + } + } + catch (final AMQInternalException ie) + { + InvalidSelectorException ise = new InvalidSelectorException("cannot create consumer because of selector issue"); + ise.setLinkedException(ie); + throw ise; + } // Force queue browsers not to use acknowledge modes. - if (_noConsume) + if (_browseOnly) { _acknowledgeMode = Session.NO_ACKNOWLEDGE; } @@ -186,7 +207,7 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa { checkPreConditions(); - return _messageSelector; + return _messageSelectorFilter == null ? null :_messageSelectorFilter.getSelector(); } public MessageListener getMessageListener() throws JMSException @@ -345,6 +366,11 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa return _receiving.get(); } + public MessageFilter getMessageSelectorFilter() + { + return _messageSelectorFilter; + } + public Message receive() throws JMSException { return receive(0); @@ -874,9 +900,9 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa return _autoClose; } - public boolean isNoConsume() + public boolean isBrowseOnly() { - return _noConsume; + return _browseOnly; } public void rollback() diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 7c8ccf4cf9..71780f5714 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -26,17 +26,14 @@ import org.apache.qpid.client.message.*; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.AMQException; -import org.apache.qpid.AMQInternalException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.transport.*; -import org.apache.qpid.filter.MessageFilter; -import org.apache.qpid.filter.JMSSelectorFilter; import org.apache.qpid.jms.Session; -import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; + import java.util.Iterator; import java.util.concurrent.atomic.AtomicBoolean; @@ -51,11 +48,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer session, AMQProtocolHandler protocolHandler, FieldTable arguments, int prefetchHigh, int prefetchLow, - boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) + boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException { super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler, - arguments, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose); + arguments, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose); _0_10session = (AMQSession_0_10) session; - if (messageSelector != null && !messageSelector.equals("")) - { - try - { - _filter = new JMSSelectorFilter(messageSelector); - } - catch (AMQInternalException e) - { - throw new InvalidSelectorException("cannot create consumer because of selector issue"); - } - if (destination instanceof AMQQueue) - { - _preAcquire = false; - } - } - - // Destination setting overrides connection defaults - if (destination.getDestSyntax() == DestSyntax.ADDR && - destination.getLink().getConsumerCapacity() > 0) - { - capacity = destination.getLink().getConsumerCapacity(); - } - else if (getSession().prefetch()) - { - capacity = _0_10session.getAMQConnection().getMaxPrefetch(); - } + + _preAcquire = evaluatePreAcquire(browseOnly, destination); + + _capacity = evaluateCapacity(destination); if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType()) { @@ -123,7 +93,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer 0) + if (_capacity > 0) { _0_10session.getQpidSession().messageFlow (getConsumerTagString(), MessageCreditUnit.MESSAGE, - capacity, + _capacity, Option.UNRELIABLE); } _0_10session.syncDispatchQueue(); o = super.getMessageFromQueue(-1); } - if (capacity == 0) + if (_capacity == 0) { _syncReceive.set(false); } @@ -536,4 +504,51 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer 0) + { + capacity = destination.getLink().getConsumerCapacity(); + } + else if (getSession().prefetch()) + { + capacity = _0_10session.getAMQConnection().getMaxPrefetch(); + } + return capacity; + } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index 00acd5e866..cc061e35cb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -20,16 +20,13 @@ */ package org.apache.qpid.client; -import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.jms.Message; import org.apache.qpid.AMQException; -import org.apache.qpid.AMQInternalException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.*; import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.filter.JMSSelectorFilter; import org.apache.qpid.framing.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,23 +38,11 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer 0) - { - JMSSelectorFilter _filter = new JMSSelectorFilter(messageSelector); - } - } - catch (AMQInternalException e) - { - throw new InvalidSelectorException("cannot create consumer because of selector issue"); - } + acknowledgeMode, browseOnly, autoClose); } void sendCancel() throws AMQException, FailoverException diff --git a/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java b/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java index 40718c6435..a1b4aff659 100644 --- a/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java +++ b/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java @@ -26,16 +26,17 @@ import org.slf4j.LoggerFactory; public class JMSSelectorFilter implements MessageFilter { - /** - * this JMSSelectorFilter's logger - */ private static final Logger _logger = LoggerFactory.getLogger(JMSSelectorFilter.class); - private String _selector; - private BooleanExpression _matcher; + private final String _selector; + private final BooleanExpression _matcher; public JMSSelectorFilter(String selector) throws AMQInternalException { + if (selector == null || "".equals(selector)) + { + throw new IllegalArgumentException("Cannot create a JMSSelectorFilter with a null or empty selector string"); + } _selector = selector; if (_logger.isDebugEnabled()) { @@ -51,8 +52,7 @@ public class JMSSelectorFilter implements MessageFilter boolean match = _matcher.matches(message); if (_logger.isDebugEnabled()) { - _logger.debug(message + " match(" + match + ") selector(" + System - .identityHashCode(_selector) + "):" + _selector); + _logger.debug(message + " match(" + match + ") selector(" + _selector + "): " + _selector); } return match; } diff --git a/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java b/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java index 62e4a28c1e..ec0e8ea4c0 100644 --- a/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java +++ b/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java @@ -17,11 +17,11 @@ */ package org.apache.qpid.filter; -import org.apache.qpid.AMQInternalException; import org.apache.qpid.client.message.AbstractJMSMessage; public interface MessageFilter { - boolean matches(AbstractJMSMessage message) throws AMQInternalException; + boolean matches(AbstractJMSMessage message); + String getSelector(); } diff --git a/java/client/src/test/java/org/apache/qpid/filter/JMSSelectorFilterTest.java b/java/client/src/test/java/org/apache/qpid/filter/JMSSelectorFilterTest.java new file mode 100644 index 0000000000..d4d8ea4350 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/filter/JMSSelectorFilterTest.java @@ -0,0 +1,108 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.filter; + +import junit.framework.TestCase; + +import org.apache.qpid.AMQInternalException; +import org.apache.qpid.client.message.JMSTextMessage; +import org.apache.qpid.client.message.TestMessageHelper; + +public class JMSSelectorFilterTest extends TestCase +{ + + public void testEmptySelectorFilter() throws Exception + { + try + { + new JMSSelectorFilter(""); + fail("Should not be able to create a JMSSelectorFilter with an empty selector"); + } + catch (IllegalArgumentException iae) + { + // pass + } + } + + public void testNullSelectorFilter() throws Exception + { + try + { + new JMSSelectorFilter(null); + fail("Should not be able to create a JMSSelectorFilter with a null selector"); + } + catch (IllegalArgumentException iae) + { + // pass + } + } + + public void testInvalidSelectorFilter() throws Exception + { + try + { + new JMSSelectorFilter("$%^"); + fail("Unparsable selector so expected AMQInternalException to be thrown"); + } + catch (AMQInternalException amqie) + { + // pass + } + } + + public void testSimpleSelectorFilter() throws Exception + { + MessageFilter simpleSelectorFilter = new JMSSelectorFilter("select=5"); + + assertNotNull("Filter object is null", simpleSelectorFilter); + assertNotNull("Selector string is null", simpleSelectorFilter.getSelector()); + assertEquals("Unexpected selector", "select=5", simpleSelectorFilter.getSelector()); + assertTrue("Filter object is invalid", simpleSelectorFilter != null); + + final JMSTextMessage message = TestMessageHelper.newJMSTextMessage(); + + message.setIntProperty("select", 4); + assertFalse("Selector did match when not expected", simpleSelectorFilter.matches(message)); + message.setIntProperty("select", 5); + assertTrue("Selector didnt match when expected", simpleSelectorFilter.matches(message)); + message.setIntProperty("select", 6); + assertFalse("Selector did match when not expected", simpleSelectorFilter.matches(message)); + } + + public void testFailedMatchingFilter() throws Exception + { + MessageFilter simpleSelectorFilter = new JMSSelectorFilter("select>4"); + + assertNotNull("Filter object is null", simpleSelectorFilter); + assertNotNull("Selector string is null", simpleSelectorFilter.getSelector()); + assertEquals("Unexpected selector", "select>4", simpleSelectorFilter.getSelector()); + assertTrue("Filter object is invalid", simpleSelectorFilter != null); + + final JMSTextMessage message = TestMessageHelper.newJMSTextMessage(); + + message.setStringProperty("select", "5"); + assertFalse("Selector matched when not expected", simpleSelectorFilter.matches(message)); + message.setStringProperty("select", "elephant"); + assertFalse("Selector matched when not expected", simpleSelectorFilter.matches(message)); + message.setBooleanProperty("select", false); + assertFalse("Selector matched when not expected", simpleSelectorFilter.matches(message)); + } +} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java index 6759b43387..b9d1476055 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java @@ -36,6 +36,7 @@ import org.apache.qpid.client.BasicMessageProducer_0_8; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.filter.MessageFilter; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; @@ -124,7 +125,7 @@ public class TestAMQSession extends AMQSession