diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2015-02-17 17:41:28 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2015-02-17 17:41:28 +0000 |
commit | 2d3a1f587a2201eed232cdc4b4ee589ea52e3606 (patch) | |
tree | df3ffc561d223df11e6c5d1034f855a4da97549f /qpid | |
parent | 2e60cf6ce254d749890c5740d0d3b15e2b1e41a3 (diff) | |
download | qpid-python-2d3a1f587a2201eed232cdc4b4ee589ea52e3606.tar.gz |
QPID-6395 : [Java Broker] add support for queue default filters, and filters solely on arrival time
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1660458 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
31 files changed, 792 insertions, 171 deletions
diff --git a/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/model/validation/AttributeAnnotationValidator.java b/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/model/validation/AttributeAnnotationValidator.java index a831e1ebd9..c09a2b4f72 100644 --- a/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/model/validation/AttributeAnnotationValidator.java +++ b/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/model/validation/AttributeAnnotationValidator.java @@ -278,6 +278,13 @@ public class AttributeAnnotationValidator extends AbstractProcessor return true; } + + if(typeUtils.isSameType(type,elementUtils.getTypeElement("java.lang.Object").asType())) + { + return true; + } + + if(typeUtils.isSameType(type, elementUtils.getTypeElement("java.lang.String").asType())) { return true; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java index 597fc44e4c..de796a846a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java @@ -87,6 +87,12 @@ class HeadersBinding +"' with arguments: " + _binding.getArguments()); _filter = new MessageFilter() { + @Override + public String getName() + { + return ""; + } + @Override public boolean matches(Filterable message) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java new file mode 100644 index 0000000000..dbd6a5f6f6 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.filter; + +import org.apache.qpid.common.AMQPFilterTypes; + +public final class ArrivalTimeFilter implements MessageFilter +{ + private final long _startingFrom; + + public ArrivalTimeFilter(final long startingFrom) + { + _startingFrom = startingFrom; + } + + @Override + public String getName() + { + return AMQPFilterTypes.REPLAY_PERIOD.toString(); + } + + @Override + public boolean matches(final Filterable message) + { + return message.getArrivalTime() >= _startingFrom; + } + +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java new file mode 100644 index 0000000000..28e05eaa52 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.filter; + +import java.util.List; + +import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.server.plugin.MessageFilterFactory; +import org.apache.qpid.server.plugin.PluggableService; + +@PluggableService +public final class ArrivalTimeFilterFactory implements MessageFilterFactory +{ + + @Override + public MessageFilter newInstance(final List<Object> arguments) + { + if(arguments == null || arguments.size() != 1) + { + throw new IllegalArgumentException("Cannot create a filter from these arguments: " + arguments); + } + Object arg = arguments.get(0); + long startingFrom; + if(arg instanceof Number) + { + startingFrom = ((Number)arg).longValue(); + } + else + { + startingFrom = Long.parseLong(String.valueOf(arg)); + } + return new ArrivalTimeFilter(startingFrom); + } + + @Override + public String getType() + { + return AMQPFilterTypes.REPLAY_PERIOD.toString(); + } + +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java index 69fc520a8f..ad14fa423a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java @@ -14,26 +14,62 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. - * + * under the License. * + * */ package org.apache.qpid.server.filter; -// -// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html> -// import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; -public interface FilterManager +public class FilterManager { - void add(MessageFilter filter); - void remove(MessageFilter filter); + private final Map<String, MessageFilter> _filters = new ConcurrentHashMap<>(); + + public FilterManager() + { + } + + public void add(String name, MessageFilter filter) + { + _filters.put(name, filter); + } + + public boolean allAllow(Filterable msg) + { + for (MessageFilter filter : _filters.values()) + { + if (!filter.matches(msg)) + { + return false; + } + } + return true; + } + + public Iterator<MessageFilter> filters() + { + return _filters.values().iterator(); + } + + public boolean hasFilters() + { + return !_filters.isEmpty(); + } + + public boolean hasFilter(final String name) + { + return _filters.containsKey(name); + } - boolean allAllow(Filterable msg); + @Override + public String toString() + { + return _filters.toString(); + } - Iterator<MessageFilter> filters(); - boolean hasFilters(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java index a159a8506b..28f7fe3554 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.filter; +import java.util.Map; + import org.apache.log4j.Logger; import org.apache.qpid.common.AMQPFilterTypes; @@ -27,8 +29,6 @@ import org.apache.qpid.filter.SelectorParsingException; import org.apache.qpid.filter.selector.ParseException; import org.apache.qpid.filter.selector.TokenMgrError; -import java.util.Map; - public class FilterManagerFactory { @@ -54,20 +54,13 @@ public class FilterManagerFactory if (selector instanceof String && !selector.equals("")) { - manager = new SimpleFilterManager(); + manager = new FilterManager(); try { - manager.add(new JMSSelectorFilter((String)selector)); - } - catch (ParseException e) - { - throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selector + "\"", e); - } - catch (SelectorParsingException e) - { - throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selector + "\"", e); + MessageFilter filter = new JMSSelectorFilter((String)selector); + manager.add(filter.getName(), filter); } - catch (TokenMgrError e) + catch (ParseException | SelectorParsingException | TokenMgrError e) { throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selector + "\"", e); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java index d0b1670a45..6b8ae2f552 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java @@ -26,12 +26,14 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.WeakHashMap; + import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.filter.SelectorParsingException; import org.apache.qpid.filter.selector.ParseException; import org.apache.qpid.filter.selector.TokenMgrError; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.plugin.PluggableService; import org.apache.qpid.server.queue.AMQQueue; public class FilterSupport @@ -57,15 +59,7 @@ public class FilterSupport { selector = new JMSSelectorFilter(selectorString); } - catch (ParseException e) - { - throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e); - } - catch (SelectorParsingException e) - { - throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e); - } - catch (TokenMgrError e) + catch (ParseException | SelectorParsingException | TokenMgrError e) { throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e); } @@ -119,6 +113,7 @@ public class FilterSupport } } + @PluggableService public static final class NoLocalFilter implements MessageFilter { private final MessageSource _queue; @@ -128,6 +123,12 @@ public class FilterSupport _queue = queue; } + @Override + public String getName() + { + return AMQPFilterTypes.NO_LOCAL.toString(); + } + public boolean matches(Filterable message) { @@ -165,6 +166,8 @@ public class FilterSupport { return _queue != null ? _queue.hashCode() : 0; } + + } static final class CompoundFilter implements MessageFilter @@ -178,6 +181,12 @@ public class FilterSupport _jmsSelectorFilter = jmsSelectorFilter; } + @Override + public String getName() + { + return ""; + } + public boolean matches(Filterable message) { return _noLocalFilter.matches(message) && _jmsSelectorFilter.matches(message); @@ -216,5 +225,7 @@ public class FilterSupport result = 31 * result + (_jmsSelectorFilter != null ? _jmsSelectorFilter.hashCode() : 0); return result; } + + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java index 589e888059..295f9ae074 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java @@ -34,6 +34,10 @@ public interface Filterable Object getConnectionReference(); + long getMessageNumber(); + + long getArrivalTime(); + public class Factory { @@ -41,6 +45,7 @@ public interface Filterable { return new Filterable() { + @Override public AMQMessageHeader getMessageHeader() { @@ -64,6 +69,18 @@ public interface Filterable { return message.getConnectionReference(); } + + @Override + public long getMessageNumber() + { + return message.getMessageNumber(); + } + + @Override + public long getArrivalTime() + { + return message.getArrivalTime(); + } }; } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java index 744e4e4e9d..a36049cd23 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java @@ -25,14 +25,18 @@ import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; import org.apache.log4j.Logger; + +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.filter.BooleanExpression; import org.apache.qpid.filter.FilterableMessage; import org.apache.qpid.filter.SelectorParsingException; import org.apache.qpid.filter.selector.ParseException; import org.apache.qpid.filter.selector.SelectorParser; import org.apache.qpid.filter.selector.TokenMgrError; +import org.apache.qpid.server.plugin.PluggableService; +@PluggableService public class JMSSelectorFilter implements MessageFilter { private final static Logger _logger = org.apache.log4j.Logger.getLogger(JMSSelectorFilter.class); @@ -46,6 +50,12 @@ public class JMSSelectorFilter implements MessageFilter _matcher = new SelectorParser().parse(selector); } + @Override + public String getName() + { + return AMQPFilterTypes.JMS_SELECTOR.toString(); + } + public boolean matches(Filterable message) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilterFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilterFactory.java new file mode 100644 index 0000000000..683906dc88 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilterFactory.java @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.filter; + +import java.util.List; + +import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.filter.SelectorParsingException; +import org.apache.qpid.filter.selector.ParseException; +import org.apache.qpid.filter.selector.TokenMgrError; +import org.apache.qpid.server.plugin.MessageFilterFactory; +import org.apache.qpid.server.plugin.PluggableService; + +@PluggableService +public final class JMSSelectorFilterFactory implements MessageFilterFactory +{ + public String getType() + { + return AMQPFilterTypes.JMS_SELECTOR.toString(); + } + + @Override + public MessageFilter newInstance(final List<Object> arguments) + { + if(arguments == null || arguments.size() != 1) + { + throw new IllegalArgumentException("Cannot create a filter from these arguments: " + arguments); + } + Object arg = arguments.get(0); + try + { + return new JMSSelectorFilter(String.valueOf(arg)); + } + catch (ParseException | TokenMgrError | SelectorParsingException e) + { + throw new IllegalArgumentException("Cannot create an JMS Selector from '" + arg + "'", e); + } + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java index d7dbbea166..226d646efd 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java @@ -22,5 +22,6 @@ package org.apache.qpid.server.filter; public interface MessageFilter { + String getName(); boolean matches(Filterable message); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java deleted file mode 100644 index 111fb6a333..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.server.filter; - -import org.apache.log4j.Logger; - -import java.util.Iterator; -import java.util.concurrent.ConcurrentLinkedQueue; - -public class SimpleFilterManager implements FilterManager -{ - private final Logger _logger = Logger.getLogger(SimpleFilterManager.class); - - private final ConcurrentLinkedQueue<MessageFilter> _filters; - private String _toString = ""; - - public SimpleFilterManager() - { - _logger.debug("Creating SimpleFilterManager"); - _filters = new ConcurrentLinkedQueue<MessageFilter>(); - } - - public SimpleFilterManager(JMSSelectorFilter messageFilter) - { - this(); - add(messageFilter); - } - - public void add(MessageFilter filter) - { - _filters.add(filter); - updateStringValue(); - } - - public void remove(MessageFilter filter) - { - _filters.remove(filter); - updateStringValue(); - } - - public boolean allAllow(Filterable msg) - { - for (MessageFilter filter : _filters) - { - if (!filter.matches(msg)) - { - return false; - } - } - return true; - } - - @Override - public Iterator<MessageFilter> filters() - { - return _filters.iterator(); - } - - public boolean hasFilters() - { - return !_filters.isEmpty(); - } - - - @Override - public String toString() - { - return _toString; - } - - private void updateStringValue() - { - StringBuilder toString = new StringBuilder(); - for (MessageFilter filter : _filters) - { - toString.append(filter.toString()); - toString.append(","); - } - - if (_filters.size() > 0) - { - //Remove the last ',' - toString.deleteCharAt(toString.length()-1); - } - _toString = toString.toString(); - } -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java index 46fbaac3f2..b69c17dca0 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java @@ -21,6 +21,8 @@ package org.apache.qpid.server.model; import java.util.Collection; +import java.util.List; +import java.util.Map; import org.apache.qpid.server.queue.QueueEntryVisitor; import org.apache.qpid.server.store.MessageDurability; @@ -48,6 +50,7 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X> String QUEUE_FLOW_STOPPED = "queueFlowStopped"; String MAXIMUM_MESSAGE_TTL = "maximumMessageTtl"; String MINIMUM_MESSAGE_TTL = "minimumMessageTtl"; + String DEFAULT_FILTERS = "defaultFilters"; String QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT = "queue.minimumEstimatedMemoryFootprint"; @ManagedContextDefault( name = QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT) @@ -155,6 +158,9 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X> @ManagedAttribute long getMaximumMessageTtl(); + @ManagedAttribute + Map<String, Map<String,List<Object>>> getDefaultFilters(); + //children Collection<? extends Binding> getBindings(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFilterFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFilterFactory.java new file mode 100644 index 0000000000..9c76f5590e --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFilterFactory.java @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.plugin; + +import java.util.List; + +import org.apache.qpid.server.filter.MessageFilter; + +public interface MessageFilterFactory extends Pluggable +{ + MessageFilter newInstance(List<Object> arguments); +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/QpidServiceLoader.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/QpidServiceLoader.java index f70afb12ba..e6100efda7 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/QpidServiceLoader.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/QpidServiceLoader.java @@ -19,8 +19,11 @@ package org.apache.qpid.server.plugin; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.ServiceLoader; import org.apache.log4j.Logger; @@ -47,6 +50,16 @@ public class QpidServiceLoader return instancesOf(clazz, true); } + public <C extends Pluggable> Map<String,C> getInstancesByType(Class<C> clazz) + { + Map<String,C> instances = new HashMap<>(); + for(C instance : instancesOf(clazz)) + { + instances.put(instance.getType(), instance); + } + return Collections.unmodifiableMap(instances); + } + private <C extends Pluggable> Iterable<C> instancesOf(Class<C> clazz, boolean atLeastOne) { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index f3711ee53a..02798e9834 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -30,6 +30,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CopyOnWriteArrayList; @@ -51,6 +52,7 @@ import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.filter.FilterManager; +import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; @@ -74,6 +76,8 @@ import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.QueueNotificationListener; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.StateTransition; +import org.apache.qpid.server.plugin.MessageFilterFactory; +import org.apache.qpid.server.plugin.QpidServiceLoader; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.SecurityManager; @@ -185,6 +189,9 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> @ManagedAttributeField private MessageDurability _messageDurability; + @ManagedAttributeField + private Map<String, Map<String,List<Object>>> _defaultFilters; + private Object _exclusiveOwner; // could be connection, session, Principal or a String for the container name private final Set<NotificationCheck> _notificationChecks = @@ -246,6 +253,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> private final QueueRunner _queueRunner = new QueueRunner(this); private boolean _closing; + private final ConcurrentHashMap<String,MessageFilter> _defaultFiltersMap = new ConcurrentHashMap<>(); protected AbstractQueue(Map<String, Object> attributes, VirtualHostImpl virtualHost) { @@ -449,6 +457,40 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } _maxAsyncDeliveries = getContextValue(Integer.class, Queue.MAX_ASYNCHRONOUS_DELIVERIES); + + if(_defaultFilters != null) + { + QpidServiceLoader qpidServiceLoader = new QpidServiceLoader(); + final Map<String, MessageFilterFactory> messageFilterFactories = + qpidServiceLoader.getInstancesByType(MessageFilterFactory.class); + + for (Map.Entry<String,Map<String,List<Object>>> entry : _defaultFilters.entrySet()) + { + String name = String.valueOf(entry.getKey()); + Map<String, List<Object>> filterValue = entry.getValue(); + if(filterValue.size() == 1) + { + String filterTypeName = String.valueOf(filterValue.keySet().iterator().next()); + MessageFilterFactory filterFactory = messageFilterFactories.get(filterTypeName); + if(filterFactory != null) + { + List<Object> filterArguments = filterValue.values().iterator().next(); + _defaultFiltersMap.put(name, filterFactory.newInstance(filterArguments)); + } + else + { + throw new IllegalArgumentException("Unknown filter type " + filterTypeName + ", known types are: " + messageFilterFactories.keySet()); + } + } + else + { + throw new IllegalArgumentException("Filter value should be a map with one entry, having the type as key and the value being the filter arguments, not " + filterValue); + + } + + } + } + updateAlertChecks(); } @@ -554,6 +596,12 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } @Override + public Map<String, Map<String, List<Object>>> getDefaultFilters() + { + return _defaultFilters; + } + + @Override public final MessageDurability getMessageDurability() { return _messageDurability; @@ -602,7 +650,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> @Override public synchronized QueueConsumerImpl addConsumer(final ConsumerTarget target, - final FilterManager filters, + FilterManager filters, final Class<? extends ServerMessage> messageClass, final String consumerName, EnumSet<ConsumerImpl.Option> optionSet) @@ -698,7 +746,20 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> { throw new ExistingConsumerPreventsExclusive(); } - + if(!_defaultFiltersMap.isEmpty()) + { + if(filters == null) + { + filters = new FilterManager(); + } + for (Map.Entry<String,MessageFilter> filter : _defaultFiltersMap.entrySet()) + { + if(!filters.hasFilter(filter.getKey())) + { + filters.add(filter.getKey(), filter.getValue()); + } + } + } QueueConsumerImpl consumer = new QueueConsumerImpl(this, target, consumerName, diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java index 49732e8345..ae1fe12c92 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java @@ -62,6 +62,9 @@ public class QueueArgumentsConverter public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue"; + public static final String QPID_DEFAULT_FILTERS = "qpid.default_filters"; + + /** * No-local queue argument is used to support the no-local feature of Durable Subscribers. */ @@ -99,6 +102,7 @@ public class QueueArgumentsConverter ATTRIBUTE_MAPPINGS.put(QPID_NO_LOCAL, Queue.NO_LOCAL); ATTRIBUTE_MAPPINGS.put(QPID_MESSAGE_DURABILITY, Queue.MESSAGE_DURABILITY); + ATTRIBUTE_MAPPINGS.put(QPID_DEFAULT_FILTERS, Queue.DEFAULT_FILTERS); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java index 1c42d9b6fe..497a66ab5e 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java @@ -38,7 +38,6 @@ import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.filter.MessageFilter; -import org.apache.qpid.server.filter.SimpleFilterManager; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; @@ -103,16 +102,23 @@ public class MockConsumer implements ConsumerTarget { if(_messageIds != null) { - SimpleFilterManager filters = new SimpleFilterManager(); - filters.add(new MessageFilter() + FilterManager filters = new FilterManager(); + MessageFilter filter = new MessageFilter() { @Override + public String getName() + { + return ""; + } + + @Override public boolean matches(final Filterable message) { final String messageId = message.getMessageHeader().getMessageId(); return _messageIds.contains(messageId); } - }); + }; + filters.add(filter.getName(), filter); return filters; } else diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 288a4f946c..8632d04048 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -33,13 +33,16 @@ import java.util.UUID; import org.apache.log4j.Logger; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.filter.AMQInvalidArgumentException; +import org.apache.qpid.server.filter.ArrivalTimeFilter; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; +import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.message.InstanceProperties; @@ -257,6 +260,43 @@ public class ServerSessionDelegate extends SessionDelegate return; } + + if(method.hasArguments() && method.getArguments().containsKey(AMQPFilterTypes.REPLAY_PERIOD.toString())) + { + Object value = method.getArguments().get(AMQPFilterTypes.REPLAY_PERIOD.toString()); + final long period; + if(value instanceof Number) + { + period = ((Number)value).longValue(); + } + else if(value instanceof String) + { + try + { + period = Long.parseLong(value.toString()); + } + catch (NumberFormatException e) + { + exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "Cannot parse value " + value + " as a number for filter " + AMQPFilterTypes.REPLAY_PERIOD.toString()); + return; + } + } + else + { + exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "Cannot parse value " + value + " as a number for filter " + AMQPFilterTypes.REPLAY_PERIOD.toString()); + return; + } + final long startingFrom = System.currentTimeMillis() - (1000l * period); + if(filterManager == null) + { + filterManager = new FilterManager(); + } + MessageFilter filter = new ArrivalTimeFilter(startingFrom); + filterManager.add(filter.getName(), filter); + + } + + ConsumerTarget_0_10 target = new ConsumerTarget_0_10((ServerSession)session, destination, method.getAcceptMode(), method.getAcquireMode(), @@ -1596,4 +1636,5 @@ public class ServerSessionDelegate extends SessionDelegate { } } + } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 6108de2fda..9afa7c393f 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -60,11 +60,11 @@ import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.filter.AMQInvalidArgumentException; +import org.apache.qpid.server.filter.ArrivalTimeFilter; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.filter.MessageFilter; -import org.apache.qpid.server.filter.SimpleFilterManager; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.MessageOnlyCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; @@ -670,7 +670,7 @@ public class AMQChannel * @param tag the tag chosen by the client (if null, server will generate one) * @param sources the queues to subscribe to * @param acks Are acks enabled for this subscriber - * @param filters Filters to apply to this subscriber + * @param arguments Filters to apply to this subscriber * * @param exclusive Flag requesting exclusive access to the queue * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests @@ -678,7 +678,7 @@ public class AMQChannel * @throws org.apache.qpid.AMQException if something goes wrong */ public AMQShortString consumeFromSource(AMQShortString tag, Collection<MessageSource> sources, boolean acks, - FieldTable filters, boolean exclusive, boolean noLocal) + FieldTable arguments, boolean exclusive, boolean noLocal) throws MessageSource.ExistingConsumerPreventsExclusive, MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException, @@ -697,19 +697,19 @@ public class AMQChannel ConsumerTarget_0_8 target; EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class); - if(filters != null && Boolean.TRUE.equals(filters.get(AMQPFilterTypes.NO_CONSUME.getValue()))) + if(arguments != null && Boolean.TRUE.equals(arguments.get(AMQPFilterTypes.NO_CONSUME.getValue()))) { - target = ConsumerTarget_0_8.createBrowserTarget(this, tag, filters, _creditManager); + target = ConsumerTarget_0_8.createBrowserTarget(this, tag, arguments, _creditManager); } else if(acks) { - target = ConsumerTarget_0_8.createAckTarget(this, tag, filters, _creditManager); + target = ConsumerTarget_0_8.createAckTarget(this, tag, arguments, _creditManager); options.add(ConsumerImpl.Option.ACQUIRES); options.add(ConsumerImpl.Option.SEES_REQUEUES); } else { - target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, _creditManager); + target = ConsumerTarget_0_8.createNoAckTarget(this, tag, arguments, _creditManager); options.add(ConsumerImpl.Option.ACQUIRES); options.add(ConsumerImpl.Option.SEES_REQUEUES); } @@ -729,23 +729,66 @@ public class AMQChannel try { - FilterManager filterManager = FilterManagerFactory.createManager(FieldTable.convertToMap(filters)); + FilterManager filterManager = FilterManagerFactory.createManager(FieldTable.convertToMap(arguments)); if(noLocal) { if(filterManager == null) { - filterManager = new SimpleFilterManager(); + filterManager = new FilterManager(); } final Object connectionReference = getConnectionReference(); - filterManager.add(new MessageFilter() + MessageFilter filter = new MessageFilter() { + + @Override + public String getName() + { + return AMQPFilterTypes.NO_LOCAL.toString(); + } + @Override public boolean matches(final Filterable message) { return message.getConnectionReference() != connectionReference; } - }); + }; + filterManager.add(filter.getName(), filter); + } + + if(arguments != null && arguments.containsKey(AMQPFilterTypes.REPLAY_PERIOD.toString())) + { + Object value = arguments.get(AMQPFilterTypes.REPLAY_PERIOD.toString()); + final long period; + if(value instanceof Number) + { + period = ((Number)value).longValue(); + } + else if(value instanceof String) + { + try + { + period = Long.parseLong(value.toString()); + } + catch (NumberFormatException e) + { + throw new AMQInvalidArgumentException("Cannot parse value " + value + " as a number for filter " + AMQPFilterTypes.REPLAY_PERIOD.toString()); + } + } + else + { + throw new AMQInvalidArgumentException("Cannot parse value " + value + " as a number for filter " + AMQPFilterTypes.REPLAY_PERIOD.toString()); + } + + final long startingFrom = System.currentTimeMillis() - (1000l * period); + if(filterManager == null) + { + filterManager = new FilterManager(); + } + MessageFilter filter = new ArrivalTimeFilter(startingFrom); + filterManager.add(filter.getName(), filter); + } + for(MessageSource source : sources) { ConsumerImpl sub = diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 85a0b559c9..c952a3c585 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -65,8 +65,8 @@ import org.apache.qpid.filter.selector.ParseException; import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.JMSSelectorFilter; -import org.apache.qpid.server.filter.SimpleFilterManager; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.model.ExclusivityPolicy; @@ -154,15 +154,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS actualFilters.put(entry.getKey(), entry.getValue()); } - catch (ParseException e) - { - Error error = new Error(); - error.setCondition(AmqpError.INVALID_FIELD); - error.setDescription("Invalid JMS Selector: " + selectorFilter.getValue()); - error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter"))); - throw new AmqpErrorException(error); - } - catch (SelectorParsingException e) + catch (ParseException | SelectorParsingException e) { Error error = new Error(); error.setCondition(AmqpError.INVALID_FIELD); @@ -374,8 +366,16 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { name = getEndpoint().getName(); } + + FilterManager filters = null; + if(messageFilter != null) + { + filters = new FilterManager(); + filters.add(messageFilter.getName(), messageFilter); + } + _consumer = _queue.addConsumer(_target, - messageFilter == null ? null : new SimpleFilterManager(messageFilter), + filters, Message_1_0.class, name, options); } catch (MessageSource.ExistingExclusiveConsumer e) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index a71555480f..2eeea4c967 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -89,6 +89,8 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte private RejectBehaviour _rejectBehaviour; + private Map<String,Object> _consumerArguments; + public static final int QUEUE_TYPE = 1; public static final int TOPIC_TYPE = 2; public static final int UNKNOWN_TYPE = 3; @@ -299,6 +301,7 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte _bindingKeys = binding.getBindingKeys() == null || binding.getBindingKeys().length == 0 ? new AMQShortString[0] : binding.getBindingKeys(); final String rejectBehaviourValue = binding.getOption(BindingURL.OPTION_REJECT_BEHAVIOUR); _rejectBehaviour = rejectBehaviourValue == null ? null : RejectBehaviour.valueOf(rejectBehaviourValue.toUpperCase()); + _consumerArguments = binding.getConsumerOptions(); } protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, AMQShortString queueName) @@ -718,6 +721,11 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte return result; } + public Map<String, Object> getConsumerArguments() + { + return _consumerArguments; + } + public Reference getReference() throws NamingException { return new Reference( diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 9cef1f8dce..3c947043c6 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -187,6 +187,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa } final FieldTable ft = FieldTableFactory.newFieldTable(); + if(destination.getConsumerArguments() != null) + { + ft.addAll(FieldTable.convertToFieldTable(destination.getConsumerArguments())); + } // rawSelector is used by HeadersExchange and is not a JMS Selector if (rawSelector != null) { @@ -203,6 +207,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa ft.put(AMQPFilterTypes.NO_LOCAL.getValue(), noLocal); } + _arguments = ft; _addressType = _destination.getAddressType(); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index 1d7bb6087a..4f2715bd7b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -69,6 +69,7 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe consumerArguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE); } + _topicDestinationCache = session.getTopicDestinationCache(); _queueDestinationCache = session.getQueueDestinationCache(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java b/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java index 483fbaea50..d033bf86c2 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java @@ -30,7 +30,8 @@ public enum AMQPFilterTypes JMS_SELECTOR("x-filter-jms-selector"), NO_CONSUME("x-filter-no-consume"), AUTO_CLOSE("x-filter-auto-close"), - NO_LOCAL("x-qpid-no-local"); + NO_LOCAL("x-qpid-no-local"), + REPLAY_PERIOD("x-qpid-replay-period"); /** The identifying string for the filter type. */ private final AMQShortString _value; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java b/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java index 77902c3531..3ea99ce4ab 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java @@ -20,15 +20,16 @@ */ package org.apache.qpid.url; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Map; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; -import java.net.URISyntaxException; -import java.util.HashMap; - public class AMQBindingURL implements BindingURL { private static final Logger _logger = LoggerFactory.getLogger(AMQBindingURL.class); @@ -135,6 +136,20 @@ public class AMQBindingURL implements BindingURL return _options.get(key); } + @Override + public Map<String,Object> getConsumerOptions() + { + Map<String,Object> options = new HashMap<>(); + for(Map.Entry<String,String> option : _options.entrySet()) + { + if(!NON_CONSUMER_OPTIONS.contains(option.getKey())) + { + options.put(option.getKey(), option.getValue()); + } + } + return options; + } + public void setOption(String key, String value) { _options.put(key, value); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java b/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java index 80a1ae540b..91f80ff88c 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java @@ -20,6 +20,12 @@ */ package org.apache.qpid.url; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + import org.apache.qpid.framing.AMQShortString; /* @@ -47,6 +53,18 @@ public interface BindingURL */ public static final String OPTION_REJECT_BEHAVIOUR = "rejectbehaviour"; + public static final Set<String> NON_CONSUMER_OPTIONS = + Collections.unmodifiableSet(new HashSet<String>(Arrays.asList(OPTION_EXCLUSIVE, + OPTION_AUTODELETE, + OPTION_DURABLE, + OPTION_BROWSE, + OPTION_ROUTING_KEY, + OPTION_BINDING_KEY, + OPTION_EXCHANGE_AUTODELETE, + OPTION_EXCHANGE_DURABLE, + OPTION_EXCHANGE_DURABLE, + OPTION_REJECT_BEHAVIOUR))); + String getURL(); @@ -60,6 +78,9 @@ public interface BindingURL String getOption(String key); + Map<String,Object> getConsumerOptions(); + + boolean containsOption(String key); AMQShortString getRoutingKey(); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java index e609d73268..ddf13bdc37 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java @@ -126,7 +126,8 @@ public class Asserts Queue.MESSAGE_GROUP_SHARED_GROUPS, PriorityQueue.PRIORITIES, ConfiguredObject.CONTEXT, - ConfiguredObject.DESIRED_STATE); + ConfiguredObject.DESIRED_STATE, + Queue.DEFAULT_FILTERS); assertEquals("Unexpected value of queue attribute " + Queue.NAME, queueName, queueData.get(Queue.NAME)); assertNotNull("Unexpected value of queue attribute " + Queue.ID, queueData.get(Queue.ID)); diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ArrivalTimeFilterTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ArrivalTimeFilterTest.java new file mode 100644 index 0000000000..9a5982e65d --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ArrivalTimeFilterTest.java @@ -0,0 +1,107 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class ArrivalTimeFilterTest extends QpidBrokerTestCase +{ + + private String _queueName; + private Connection _connection; + private Session _session; + private Queue _queue; + + protected void setUp() throws Exception + { + super.setUp(); + + _queueName = getTestQueueName(); + _connection = getConnection(); + _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _connection.start(); + } + + + public void testArrivalTime0() throws AMQException, JMSException, InterruptedException + { + createDestinationWithFilter(0); + final MessageProducer prod = _session.createProducer(_queue); + TextMessage textMessage = _session.createTextMessage("hello"); + prod.send(textMessage); + + Thread.sleep(100); + + MessageConsumer cons = _session.createConsumer(_queue); + + assertNull("Message should not be received", cons.receive(500)); + + textMessage = _session.createTextMessage("hello"); + prod.send( textMessage); + + Message receivedMsg = cons.receive(500); + assertNotNull("Message should be received", receivedMsg); + } + + + public void testArrivalTime1000() throws AMQException, JMSException, InterruptedException + { + createDestinationWithFilter(1000); + final MessageProducer prod = _session.createProducer(_queue); + TextMessage textMessage = _session.createTextMessage("hello"); + prod.send(textMessage); + + Thread.sleep(100); + + MessageConsumer cons = _session.createConsumer(_queue); + + assertNotNull("Message should be received", cons.receive(500)); + + textMessage = _session.createTextMessage("hello"); + prod.send( textMessage); + + Message receivedMsg = cons.receive(500); + assertNotNull("Message should be received", receivedMsg); + } + + private void createDestinationWithFilter(final int period) throws AMQException, JMSException + { + ((AMQSession<?,?>) _session).createQueue(new AMQShortString(_queueName), false, true, false, null); + Queue queue = new org.apache.qpid.client.AMQQueue("amq.direct", _queueName); + ((AMQSession<?,?>) _session).declareAndBind((AMQDestination)queue); + _queue = _session.createQueue("direct://amq.direct/"+_queueName+"/"+_queueName+"?x-qpid-replay-period='"+period+"0'"); + } + + +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/DefaultFiltersTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/DefaultFiltersTest.java new file mode 100644 index 0000000000..b676e653ac --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/DefaultFiltersTest.java @@ -0,0 +1,116 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import java.util.HashMap; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class DefaultFiltersTest extends QpidBrokerTestCase +{ + + private String _queueName; + private Connection _connection; + private Session _session; + private Queue _queue; + + protected void setUp() throws Exception + { + super.setUp(); + + _queueName = getTestQueueName(); + _connection = getConnection(); + _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _connection.start(); + } + + private void createQueueWithDefaultFilter(String selector) throws AMQException + { + final Map<String,Object> arguments = new HashMap<>(); + selector = selector.replace("\\", "\\\\"); + selector = selector.replace("\"", "\\\""); + + arguments.put("qpid.default_filters","{ \"x-filter-jms-selector\" : { \"x-filter-jms-selector\" : [ \""+selector+"\" ] } }"); + ((AMQSession<?,?>) _session).createQueue(new AMQShortString(_queueName), false, true, false, arguments); + _queue = new org.apache.qpid.client.AMQQueue("amq.direct", _queueName); + ((AMQSession<?,?>) _session).declareAndBind((AMQDestination)_queue); + } + + public void testDefaultFilterIsApplied() throws AMQException, JMSException + { + createQueueWithDefaultFilter("foo = 1"); + final MessageProducer prod = _session.createProducer(_queue); + TextMessage textMessage = _session.createTextMessage("hello"); + textMessage.setIntProperty("foo", 0); + prod.send(textMessage); + + MessageConsumer cons = _session.createConsumer(_queue); + + assertNull("Message with foo=0 should not be received", cons.receive(500)); + + textMessage = _session.createTextMessage("hello"); + textMessage.setIntProperty("foo", 1); + prod.send( textMessage); + + Message receivedMsg = cons.receive(500); + assertNotNull("Message with foo=1 should be received", receivedMsg); + assertEquals("Property foo not as expected", 1, receivedMsg.getIntProperty("foo")); + } + + + public void testDefaultFilterIsOverridden() throws AMQException, JMSException + { + createQueueWithDefaultFilter("foo = 1"); + final MessageProducer prod = _session.createProducer(_queue); + TextMessage textMessage = _session.createTextMessage("hello"); + textMessage.setIntProperty("foo", 0); + prod.send(textMessage); + + MessageConsumer cons = _session.createConsumer(_queue, "foo = 0"); + + Message receivedMsg = cons.receive(500); + assertNotNull("Message with foo=0 should be received", receivedMsg); + assertEquals("Property foo not as expected", 0, receivedMsg.getIntProperty("foo")); + + + textMessage = _session.createTextMessage("hello"); + textMessage.setIntProperty("foo", 1); + prod.send(textMessage); + + assertNull("Message with foo=1 should not be received", cons.receive(500)); + + } + +} diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes index f01e245560..249244a3ae 100755 --- a/qpid/java/test-profiles/CPPExcludes +++ b/qpid/java/test-profiles/CPPExcludes @@ -209,3 +209,7 @@ org.apache.qpid.test.unit.client.AMQSessionTest#testQueueDepthForQueueThatDoesNo org.apache.qpid.client.prefetch.PrefetchBehaviourTest#testPrefetchWindowExpandsOnReceiveTransaction org.apache.qpid.client.SyncPublishTest#* + +org.apache.qpid.server.queue.ArrivalTimeFilterTest#* +org.apache.qpid.server.queue.DefaultFiltersTest#* + |