diff options
12 files changed, 372 insertions, 172 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index d34290e007..e8508a62bc 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -89,6 +89,7 @@ import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.filter.MessageFilter; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; @@ -894,7 +895,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic C consumer = _consumers.get(consumerTag); if (consumer != null) { - if (!consumer.isNoConsume()) // Normal Consumer + if (!consumer.isBrowseOnly()) // Normal Consumer { // Clean the Maps up first // Flush any pending messages for this consumerTag @@ -2572,7 +2573,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * @param queueName */ private void consumeFromQueue(C consumer, AMQShortString queueName, - AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException + AMQProtocolHandler protocolHandler, boolean nowait, MessageFilter messageSelector) throws AMQException, FailoverException { int tagId = _nextTag++; @@ -2600,7 +2601,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } public abstract void sendConsume(C consumer, AMQShortString queueName, - AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException; + AMQProtocolHandler protocolHandler, boolean nowait, MessageFilter messageSelector, int tag) throws AMQException, FailoverException; private P createProducerImpl(final Destination destination, final boolean mandatory, final boolean immediate) throws JMSException @@ -2925,7 +2926,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic try { - consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer._messageSelector); + consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelectorFilter()); } catch (FailoverException e) { @@ -3210,7 +3211,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic for (C consumer : _consumers.values()) { - if (!consumer.isNoConsume()) + if (!consumer.isBrowseOnly()) { consumer.rollback(); } @@ -3397,7 +3398,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } else { - if (consumer.isNoConsume()) + if (consumer.isBrowseOnly()) { _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "[" diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 826ca46cca..c26fe98568 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -53,6 +53,7 @@ import org.apache.qpid.client.messaging.address.Node.ExchangeNode; import org.apache.qpid.client.messaging.address.Node.QueueNode; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.filter.MessageFilter; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.protocol.AMQConstant; @@ -579,56 +580,30 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * Registers the consumer with the broker */ public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, - boolean nowait, String messageSelector, int tag) + boolean nowait, MessageFilter messageSelector, int tag) throws AMQException, FailoverException { - boolean preAcquire; - - long capacity = getCapacity(consumer.getDestination()); - - try - { - boolean isTopic; - Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments()); - - if (consumer.getDestination().getDestSyntax() == AMQDestination.DestSyntax.BURL) - { - isTopic = consumer.getDestination() instanceof AMQTopic || - consumer.getDestination().getExchangeClass().equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS) ; - - preAcquire = isTopic || (!consumer.isNoConsume() && - (consumer.getMessageSelector() == null || consumer.getMessageSelector().equals(""))); - } - else - { - isTopic = consumer.getDestination().getAddressType() == AMQDestination.TOPIC_TYPE; - - preAcquire = !consumer.isNoConsume() && - (isTopic || consumer.getMessageSelector() == null || - consumer.getMessageSelector().equals("")); - - arguments.putAll( - (Map<? extends String, ? extends Object>) consumer.getDestination().getLink().getSubscription().getArgs()); - } - - boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE; - - if (consumer.getDestination().getLink() != null) - { - acceptModeNone = consumer.getDestination().getLink().getReliability() == Link.Reliability.UNRELIABLE; - } - - getQpidSession().messageSubscribe - (queueName.toString(), String.valueOf(tag), - acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT, - preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments, - consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE); - } - catch (JMSException e) + boolean preAcquire = consumer.isPreAcquire(); + + AMQDestination destination = consumer.getDestination(); + long capacity = consumer.getCapacity(); + + Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments()); + + Link link = destination.getLink(); + if (link != null && link.getSubscription() != null && link.getSubscription().getArgs() != null) { - throw new AMQException(AMQConstant.INTERNAL_ERROR, "problem when registering consumer", e); + arguments.putAll((Map<? extends String, ? extends Object>) link.getSubscription().getArgs()); } + boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE; + + getQpidSession().messageSubscribe + (queueName.toString(), String.valueOf(tag), + acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT, + preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments, + consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE); + String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString(); if (capacity == 0) @@ -657,21 +632,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } - private long getCapacity(AMQDestination destination) - { - long capacity = 0; - if (destination.getDestSyntax() == DestSyntax.ADDR && - destination.getLink().getConsumerCapacity() > 0) - { - capacity = destination.getLink().getConsumerCapacity(); - } - else if (prefetch()) - { - capacity = getAMQConnection().getMaxPrefetch(); - } - return capacity; - } - /** * Create an 0_10 message producer */ @@ -836,7 +796,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic //only set if msg list is null try { - long capacity = getCapacity(consumer.getDestination()); + long capacity = consumer.getCapacity(); if (capacity == 0) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 369c8a6e9d..85fc857014 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -41,6 +41,7 @@ import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.filter.MessageFilter; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQShortString; @@ -333,13 +334,13 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, - String messageSelector, + MessageFilter messageSelector, int tag) throws AMQException, FailoverException { FieldTable arguments = FieldTableFactory.newFieldTable(); - if ((messageSelector != null) && !messageSelector.equals("")) + if (messageSelector != null) { - arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector); + arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector.getSelector()); } if (consumer.isAutoClose()) @@ -347,7 +348,7 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE); } - if (consumer.isNoConsume()) + if (consumer.isBrowseOnly()) { arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 3b807591b0..b1975338b7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.client; +import org.apache.qpid.AMQInternalException; +import org.apache.qpid.filter.JMSSelectorFilter; +import org.apache.qpid.filter.MessageFilter; import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.*; @@ -31,6 +34,7 @@ import org.apache.qpid.transport.TransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; @@ -52,7 +56,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa /** The connection being used by this consumer */ protected final AMQConnection _connection; - protected final String _messageSelector; + protected final MessageFilter _messageSelectorFilter; private final boolean _noLocal; @@ -138,7 +142,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa */ private final boolean _autoClose; - private final boolean _noConsume; + private final boolean _browseOnly; private List<StackTraceElement> _closedStack = null; @@ -147,11 +151,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, AMQProtocolHandler protocolHandler, FieldTable arguments, int prefetchHigh, int prefetchLow, - boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) + boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException { _channelId = channelId; _connection = connection; - _messageSelector = messageSelector; _noLocal = noLocal; _destination = destination; _messageFactory = messageFactory; @@ -164,10 +167,28 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa _synchronousQueue = new LinkedBlockingQueue(); _autoClose = autoClose; - _noConsume = noConsume; + _browseOnly = browseOnly; + + try + { + if (messageSelector == null || "".equals(messageSelector.trim())) + { + _messageSelectorFilter = null; + } + else + { + _messageSelectorFilter = new JMSSelectorFilter(messageSelector); + } + } + catch (final AMQInternalException ie) + { + InvalidSelectorException ise = new InvalidSelectorException("cannot create consumer because of selector issue"); + ise.setLinkedException(ie); + throw ise; + } // Force queue browsers not to use acknowledge modes. - if (_noConsume) + if (_browseOnly) { _acknowledgeMode = Session.NO_ACKNOWLEDGE; } @@ -186,7 +207,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa { checkPreConditions(); - return _messageSelector; + return _messageSelectorFilter == null ? null :_messageSelectorFilter.getSelector(); } public MessageListener getMessageListener() throws JMSException @@ -345,6 +366,11 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa return _receiving.get(); } + public MessageFilter getMessageSelectorFilter() + { + return _messageSelectorFilter; + } + public Message receive() throws JMSException { return receive(0); @@ -874,9 +900,9 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa return _autoClose; } - public boolean isNoConsume() + public boolean isBrowseOnly() { - return _noConsume; + return _browseOnly; } public void rollback() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 7c8ccf4cf9..71780f5714 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -26,17 +26,14 @@ import org.apache.qpid.client.message.*; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.AMQException; -import org.apache.qpid.AMQInternalException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.transport.*; -import org.apache.qpid.filter.MessageFilter; -import org.apache.qpid.filter.JMSSelectorFilter; import org.apache.qpid.jms.Session; -import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; + import java.util.Iterator; import java.util.concurrent.atomic.AtomicBoolean; @@ -52,11 +49,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM protected final Logger _logger = LoggerFactory.getLogger(getClass()); /** - * The message selector filter associated with this consumer message selector - */ - private MessageFilter _filter = null; - - /** * The underlying QpidSession */ private AMQSession_0_10 _0_10session; @@ -64,7 +56,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM /** * Indicates whether this consumer receives pre-acquired messages */ - private boolean _preAcquire = true; + private final boolean _preAcquire; /** * Specify whether this consumer is performing a sync receive @@ -72,44 +64,22 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM private final AtomicBoolean _syncReceive = new AtomicBoolean(false); private String _consumerTagString; - private long capacity = 0; + private final long _capacity; protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, - AMQSession session, AMQProtocolHandler protocolHandler, + AMQSession<?,?> session, AMQProtocolHandler protocolHandler, FieldTable arguments, int prefetchHigh, int prefetchLow, - boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) + boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException { super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler, - arguments, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose); + arguments, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose); _0_10session = (AMQSession_0_10) session; - if (messageSelector != null && !messageSelector.equals("")) - { - try - { - _filter = new JMSSelectorFilter(messageSelector); - } - catch (AMQInternalException e) - { - throw new InvalidSelectorException("cannot create consumer because of selector issue"); - } - if (destination instanceof AMQQueue) - { - _preAcquire = false; - } - } - - // Destination setting overrides connection defaults - if (destination.getDestSyntax() == DestSyntax.ADDR && - destination.getLink().getConsumerCapacity() > 0) - { - capacity = destination.getLink().getConsumerCapacity(); - } - else if (getSession().prefetch()) - { - capacity = _0_10session.getAMQConnection().getMaxPrefetch(); - } + + _preAcquire = evaluatePreAcquire(browseOnly, destination); + + _capacity = evaluateCapacity(destination); if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType()) { @@ -123,7 +93,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM } } - @Override public void setConsumerTag(int consumerTag) { super.setConsumerTag(consumerTag); @@ -149,7 +118,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { if (checkPreConditions(jmsMessage)) { - if (isMessageListenerSet() && capacity == 0) + if (isMessageListenerSet() && _capacity == 0) { messageFlow(); } @@ -160,7 +129,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { // if we are synchronously waiting for a message // and messages are not pre-fetched we then need to request another one - if(capacity == 0) + if(_capacity == 0) { messageFlow(); } @@ -238,9 +207,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM // TODO Use a tag for fiding out if message filtering is done here or by the broker. try { - if (_messageSelector != null && !_messageSelector.equals("")) + if (_messageSelectorFilter != null) { - messageOk = _filter.matches(message); + messageOk = _messageSelectorFilter.matches(message); } } catch (Exception e) @@ -275,11 +244,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM flushUnwantedMessage(message); } } - - // now we need to acquire this message if needed - // this is the case of queue with a message selector set - if (!_preAcquire && messageOk && !isNoConsume()) + else if (!_preAcquire && !isBrowseOnly()) { + // now we need to acquire this message if needed + // this is the case of queue with a message selector set if (_logger.isDebugEnabled()) { _logger.debug("filterMessage - trying to acquire message"); @@ -368,7 +336,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM super.setMessageListener(messageListener); try { - if (messageListener != null && capacity == 0) + if (messageListener != null && _capacity == 0) { messageFlow(); } @@ -408,11 +376,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM */ public Object getMessageFromQueue(long l) throws InterruptedException { - if (capacity == 0) + if (_capacity == 0) { _syncReceive.set(true); } - if (_0_10session.isStarted() && capacity == 0 && _synchronousQueue.isEmpty()) + if (_0_10session.isStarted() && _capacity == 0 && _synchronousQueue.isEmpty()) { messageFlow(); } @@ -427,18 +395,18 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM (getConsumerTagString(), MessageCreditUnit.BYTE, 0xFFFFFFFF, Option.UNRELIABLE); - if (capacity > 0) + if (_capacity > 0) { _0_10session.getQpidSession().messageFlow (getConsumerTagString(), MessageCreditUnit.MESSAGE, - capacity, + _capacity, Option.UNRELIABLE); } _0_10session.syncDispatchQueue(); o = super.getMessageFromQueue(-1); } - if (capacity == 0) + if (_capacity == 0) { _syncReceive.set(false); } @@ -536,4 +504,51 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM } } } + + long getCapacity() + { + return _capacity; + } + + boolean isPreAcquire() + { + return _preAcquire; + } + + private boolean evaluatePreAcquire(boolean browseOnly, AMQDestination destination) + { + boolean preAcquire; + if (browseOnly) + { + preAcquire = false; + } + else + { + boolean isQueue = (destination instanceof AMQQueue || getDestination().getAddressType() == AMQDestination.QUEUE_TYPE); + if (isQueue && getMessageSelectorFilter() != null) + { + preAcquire = false; + } + else + { + preAcquire = true; + } + } + return preAcquire; + } + + private long evaluateCapacity(AMQDestination destination) + { + long capacity = 0; + if (destination.getLink() != null && destination.getLink().getConsumerCapacity() > 0) + { + capacity = destination.getLink().getConsumerCapacity(); + } + else if (getSession().prefetch()) + { + capacity = _0_10session.getAMQConnection().getMaxPrefetch(); + } + return capacity; + } + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index 00acd5e866..cc061e35cb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -20,16 +20,13 @@ */ package org.apache.qpid.client; -import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.jms.Message; import org.apache.qpid.AMQException; -import org.apache.qpid.AMQInternalException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.*; import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.filter.JMSSelectorFilter; import org.apache.qpid.framing.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,23 +38,11 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, AMQProtocolHandler protocolHandler, FieldTable arguments, int prefetchHigh, int prefetchLow, - boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) throws JMSException + boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException { super(channelId, connection, destination,messageSelector,noLocal,messageFactory,session, protocolHandler, arguments, prefetchHigh, prefetchLow, exclusive, - acknowledgeMode, noConsume, autoClose); - try - { - - if (messageSelector != null && messageSelector.length() > 0) - { - JMSSelectorFilter _filter = new JMSSelectorFilter(messageSelector); - } - } - catch (AMQInternalException e) - { - throw new InvalidSelectorException("cannot create consumer because of selector issue"); - } + acknowledgeMode, browseOnly, autoClose); } void sendCancel() throws AMQException, FailoverException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java b/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java index 40718c6435..a1b4aff659 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java @@ -26,16 +26,17 @@ import org.slf4j.LoggerFactory; public class JMSSelectorFilter implements MessageFilter { - /** - * this JMSSelectorFilter's logger - */ private static final Logger _logger = LoggerFactory.getLogger(JMSSelectorFilter.class); - private String _selector; - private BooleanExpression _matcher; + private final String _selector; + private final BooleanExpression _matcher; public JMSSelectorFilter(String selector) throws AMQInternalException { + if (selector == null || "".equals(selector)) + { + throw new IllegalArgumentException("Cannot create a JMSSelectorFilter with a null or empty selector string"); + } _selector = selector; if (_logger.isDebugEnabled()) { @@ -51,8 +52,7 @@ public class JMSSelectorFilter implements MessageFilter boolean match = _matcher.matches(message); if (_logger.isDebugEnabled()) { - _logger.debug(message + " match(" + match + ") selector(" + System - .identityHashCode(_selector) + "):" + _selector); + _logger.debug(message + " match(" + match + ") selector(" + _selector + "): " + _selector); } return match; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java b/qpid/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java index 62e4a28c1e..ec0e8ea4c0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java @@ -17,11 +17,11 @@ */ package org.apache.qpid.filter; -import org.apache.qpid.AMQInternalException; import org.apache.qpid.client.message.AbstractJMSMessage; public interface MessageFilter { - boolean matches(AbstractJMSMessage message) throws AMQInternalException; + boolean matches(AbstractJMSMessage message); + String getSelector(); } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/filter/JMSSelectorFilterTest.java b/qpid/java/client/src/test/java/org/apache/qpid/filter/JMSSelectorFilterTest.java new file mode 100644 index 0000000000..d4d8ea4350 --- /dev/null +++ b/qpid/java/client/src/test/java/org/apache/qpid/filter/JMSSelectorFilterTest.java @@ -0,0 +1,108 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.filter; + +import junit.framework.TestCase; + +import org.apache.qpid.AMQInternalException; +import org.apache.qpid.client.message.JMSTextMessage; +import org.apache.qpid.client.message.TestMessageHelper; + +public class JMSSelectorFilterTest extends TestCase +{ + + public void testEmptySelectorFilter() throws Exception + { + try + { + new JMSSelectorFilter(""); + fail("Should not be able to create a JMSSelectorFilter with an empty selector"); + } + catch (IllegalArgumentException iae) + { + // pass + } + } + + public void testNullSelectorFilter() throws Exception + { + try + { + new JMSSelectorFilter(null); + fail("Should not be able to create a JMSSelectorFilter with a null selector"); + } + catch (IllegalArgumentException iae) + { + // pass + } + } + + public void testInvalidSelectorFilter() throws Exception + { + try + { + new JMSSelectorFilter("$%^"); + fail("Unparsable selector so expected AMQInternalException to be thrown"); + } + catch (AMQInternalException amqie) + { + // pass + } + } + + public void testSimpleSelectorFilter() throws Exception + { + MessageFilter simpleSelectorFilter = new JMSSelectorFilter("select=5"); + + assertNotNull("Filter object is null", simpleSelectorFilter); + assertNotNull("Selector string is null", simpleSelectorFilter.getSelector()); + assertEquals("Unexpected selector", "select=5", simpleSelectorFilter.getSelector()); + assertTrue("Filter object is invalid", simpleSelectorFilter != null); + + final JMSTextMessage message = TestMessageHelper.newJMSTextMessage(); + + message.setIntProperty("select", 4); + assertFalse("Selector did match when not expected", simpleSelectorFilter.matches(message)); + message.setIntProperty("select", 5); + assertTrue("Selector didnt match when expected", simpleSelectorFilter.matches(message)); + message.setIntProperty("select", 6); + assertFalse("Selector did match when not expected", simpleSelectorFilter.matches(message)); + } + + public void testFailedMatchingFilter() throws Exception + { + MessageFilter simpleSelectorFilter = new JMSSelectorFilter("select>4"); + + assertNotNull("Filter object is null", simpleSelectorFilter); + assertNotNull("Selector string is null", simpleSelectorFilter.getSelector()); + assertEquals("Unexpected selector", "select>4", simpleSelectorFilter.getSelector()); + assertTrue("Filter object is invalid", simpleSelectorFilter != null); + + final JMSTextMessage message = TestMessageHelper.newJMSTextMessage(); + + message.setStringProperty("select", "5"); + assertFalse("Selector matched when not expected", simpleSelectorFilter.matches(message)); + message.setStringProperty("select", "elephant"); + assertFalse("Selector matched when not expected", simpleSelectorFilter.matches(message)); + message.setBooleanProperty("select", false); + assertFalse("Selector matched when not expected", simpleSelectorFilter.matches(message)); + } +} diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java index 6759b43387..b9d1476055 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java @@ -36,6 +36,7 @@ import org.apache.qpid.client.BasicMessageProducer_0_8; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.filter.MessageFilter; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; @@ -124,7 +125,7 @@ public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMe return false; } - public void sendConsume(BasicMessageConsumer_0_8 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException + public void sendConsume(BasicMessageConsumer_0_8 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, MessageFilter messageSelector, int tag) throws AMQException, FailoverException { } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java index d6caf05d33..4eb328f091 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java @@ -261,8 +261,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase protected void checkOverlappingMultipleGetEnum(int expectedMessages, int browserEnumerationCount, String selector) throws JMSException { QueueBrowser queueBrowser = selector == null ? - _clientSession.createBrowser(_queue) : _clientSession.createBrowser(_queue); -// _clientSession.createBrowser(_queue) : _clientSession.createBrowser(_queue, selector); + _clientSession.createBrowser(_queue) : _clientSession.createBrowser(_queue, selector); Enumeration[] msgs = new Enumeration[browserEnumerationCount]; int[] msgCount = new int[browserEnumerationCount]; @@ -347,7 +346,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase protected void checkQueueDepthWithSelectors(int totalMessages, int clients) throws JMSException { - String selector = MESSAGE_ID_PROPERTY + " % " + clients; + String selector = MESSAGE_ID_PROPERTY + " % " + clients + " = 0" ; checkOverlappingMultipleGetEnum(totalMessages / clients, clients, selector); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java index 8c3c247e2b..b70b2f90e4 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java @@ -1,4 +1,3 @@ -package org.apache.qpid.test.client.destination; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,11 +18,13 @@ package org.apache.qpid.test.client.destination; * under the License. * */ - +package org.apache.qpid.test.client.destination; import java.util.Collections; +import java.util.Enumeration; import java.util.HashMap; import java.util.Hashtable; +import java.util.List; import java.util.Map; import java.util.Properties; @@ -34,6 +35,7 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; +import javax.jms.QueueBrowser; import javax.jms.QueueReceiver; import javax.jms.QueueSession; import javax.jms.Session; @@ -475,13 +477,13 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { prod.send(jmsSession.createTextMessage("msg" + i) ); } - - for (int i=0; i< 9; i++) + Message msg = null; + for (int i=0; i< 10; i++) { - cons.receive(); + msg = cons.receive(RECEIVE_TIMEOUT); + assertNotNull("Should have received " + i + " message", msg); + assertEquals("Unexpected message received", "msg" + i, ((TextMessage)msg).getText()); } - Message msg = cons.receive(RECEIVE_TIMEOUT); - assertNotNull("Should have received the 10th message",msg); assertNull("Shouldn't have received the 11th message as capacity is 10",cons.receive(RECEIVE_TIMEOUT)); msg.acknowledge(); for (int i=11; i<16; i++) @@ -1182,4 +1184,106 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertNotNull("The consumer on the queue bound to the alt-exchange should receive the message",cons.receive(1000)); cons.close(); } + + public void testQueueBrowserWithSelectorAutoAcknowledgement() throws Exception + { + assertQueueBrowserWithSelector(Session.AUTO_ACKNOWLEDGE); + } + + public void testQueueBrowserWithSelectorClientAcknowldgement() throws Exception + { + assertQueueBrowserWithSelector(Session.CLIENT_ACKNOWLEDGE); + } + + public void testQueueBrowserWithSelectorTransactedSession() throws Exception + { + assertQueueBrowserWithSelector(Session.SESSION_TRANSACTED); + } + + public void testConsumerWithSelectorAutoAcknowledgement() throws Exception + { + assertConsumerWithSelector(Session.AUTO_ACKNOWLEDGE); + } + + public void testConsumerWithSelectorClientAcknowldgement() throws Exception + { + assertConsumerWithSelector(Session.CLIENT_ACKNOWLEDGE); + } + + public void testConsumerWithSelectorTransactedSession() throws Exception + { + assertConsumerWithSelector(Session.SESSION_TRANSACTED); + } + + private void assertQueueBrowserWithSelector(int acknowledgement) throws Exception + { + String queueAddress = "ADDR:" + getTestQueueName() + ";{create: always}"; + + boolean transacted = acknowledgement == Session.SESSION_TRANSACTED; + Session session = _connection.createSession(transacted, acknowledgement); + + Queue queue = session.createQueue(queueAddress); + + final int numberOfMessages = 10; + List<Message> sentMessages = sendMessage(session, queue, numberOfMessages); + assertNotNull("Messages were not sent", sentMessages); + assertEquals("Unexpected number of messages were sent", numberOfMessages, sentMessages.size()); + + QueueBrowser browser = session.createBrowser(queue, INDEX + "%2=0"); + _connection.start(); + + Enumeration<Message> enumaration = browser.getEnumeration(); + + int counter = 0; + int expectedIndex = 0; + while (enumaration.hasMoreElements()) + { + Message m = enumaration.nextElement(); + assertNotNull("Expected not null message at step " + counter, m); + int messageIndex = m.getIntProperty(INDEX); + assertEquals("Unexpected index", expectedIndex, messageIndex); + expectedIndex += 2; + counter++; + } + assertEquals("Unexpected number of messsages received", 5, counter); + } + + private void assertConsumerWithSelector(int acknowledgement) throws Exception + { + String queueAddress = "ADDR:" + getTestQueueName() + ";{create: always}"; + + boolean transacted = acknowledgement == Session.SESSION_TRANSACTED; + Session session = _connection.createSession(transacted, acknowledgement); + + Queue queue = session.createQueue(queueAddress); + + final int numberOfMessages = 10; + List<Message> sentMessages = sendMessage(session, queue, numberOfMessages); + assertNotNull("Messages were not sent", sentMessages); + assertEquals("Unexpected number of messages were sent", numberOfMessages, sentMessages.size()); + + MessageConsumer consumer = session.createConsumer(queue, INDEX + "%2=0"); + + int expectedIndex = 0; + for (int i = 0; i < 5; i++) + { + Message m = consumer.receive(RECEIVE_TIMEOUT); + assertNotNull("Expected not null message at step " + i, m); + int messageIndex = m.getIntProperty(INDEX); + assertEquals("Unexpected index", expectedIndex, messageIndex); + expectedIndex += 2; + + if (transacted) + { + session.commit(); + } + else if (acknowledgement == Session.CLIENT_ACKNOWLEDGE) + { + m.acknowledge(); + } + } + + Message m = consumer.receive(RECEIVE_TIMEOUT); + assertNull("Unexpected message received", m); + } } |