summaryrefslogtreecommitdiff
path: root/qpid
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-02-17 17:41:28 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-02-17 17:41:28 +0000
commit2d3a1f587a2201eed232cdc4b4ee589ea52e3606 (patch)
treedf3ffc561d223df11e6c5d1034f855a4da97549f /qpid
parent2e60cf6ce254d749890c5740d0d3b15e2b1e41a3 (diff)
downloadqpid-python-2d3a1f587a2201eed232cdc4b4ee589ea52e3606.tar.gz
QPID-6395 : [Java Broker] add support for queue default filters, and filters solely on arrival time
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1660458 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rw-r--r--qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/model/validation/AttributeAnnotationValidator.java7
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java46
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java59
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java58
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java19
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java29
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java17
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java10
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilterFactory.java57
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java105
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFilterFactory.java30
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/QpidServiceLoader.java13
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java65
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java4
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java14
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java41
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java65
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java22
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java8
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java5
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java1
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java3
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java21
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java21
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java3
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ArrivalTimeFilterTest.java107
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/queue/DefaultFiltersTest.java116
-rwxr-xr-xqpid/java/test-profiles/CPPExcludes4
31 files changed, 792 insertions, 171 deletions
diff --git a/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/model/validation/AttributeAnnotationValidator.java b/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/model/validation/AttributeAnnotationValidator.java
index a831e1ebd9..c09a2b4f72 100644
--- a/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/model/validation/AttributeAnnotationValidator.java
+++ b/qpid/java/broker-codegen/src/main/java/org/apache/qpid/server/model/validation/AttributeAnnotationValidator.java
@@ -278,6 +278,13 @@ public class AttributeAnnotationValidator extends AbstractProcessor
return true;
}
+
+ if(typeUtils.isSameType(type,elementUtils.getTypeElement("java.lang.Object").asType()))
+ {
+ return true;
+ }
+
+
if(typeUtils.isSameType(type, elementUtils.getTypeElement("java.lang.String").asType()))
{
return true;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
index 597fc44e4c..de796a846a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
@@ -87,6 +87,12 @@ class HeadersBinding
+"' with arguments: " + _binding.getArguments());
_filter = new MessageFilter()
{
+ @Override
+ public String getName()
+ {
+ return "";
+ }
+
@Override
public boolean matches(Filterable message)
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java
new file mode 100644
index 0000000000..dbd6a5f6f6
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilter.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.filter;
+
+import org.apache.qpid.common.AMQPFilterTypes;
+
+public final class ArrivalTimeFilter implements MessageFilter
+{
+ private final long _startingFrom;
+
+ public ArrivalTimeFilter(final long startingFrom)
+ {
+ _startingFrom = startingFrom;
+ }
+
+ @Override
+ public String getName()
+ {
+ return AMQPFilterTypes.REPLAY_PERIOD.toString();
+ }
+
+ @Override
+ public boolean matches(final Filterable message)
+ {
+ return message.getArrivalTime() >= _startingFrom;
+ }
+
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java
new file mode 100644
index 0000000000..28e05eaa52
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.filter;
+
+import java.util.List;
+
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.server.plugin.MessageFilterFactory;
+import org.apache.qpid.server.plugin.PluggableService;
+
+@PluggableService
+public final class ArrivalTimeFilterFactory implements MessageFilterFactory
+{
+
+ @Override
+ public MessageFilter newInstance(final List<Object> arguments)
+ {
+ if(arguments == null || arguments.size() != 1)
+ {
+ throw new IllegalArgumentException("Cannot create a filter from these arguments: " + arguments);
+ }
+ Object arg = arguments.get(0);
+ long startingFrom;
+ if(arg instanceof Number)
+ {
+ startingFrom = ((Number)arg).longValue();
+ }
+ else
+ {
+ startingFrom = Long.parseLong(String.valueOf(arg));
+ }
+ return new ArrivalTimeFilter(startingFrom);
+ }
+
+ @Override
+ public String getType()
+ {
+ return AMQPFilterTypes.REPLAY_PERIOD.toString();
+ }
+
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java
index 69fc520a8f..ad14fa423a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java
@@ -14,26 +14,62 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- *
+ * under the License.
*
+ *
*/
package org.apache.qpid.server.filter;
-//
-// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
-//
import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
-public interface FilterManager
+public class FilterManager
{
- void add(MessageFilter filter);
- void remove(MessageFilter filter);
+ private final Map<String, MessageFilter> _filters = new ConcurrentHashMap<>();
+
+ public FilterManager()
+ {
+ }
+
+ public void add(String name, MessageFilter filter)
+ {
+ _filters.put(name, filter);
+ }
+
+ public boolean allAllow(Filterable msg)
+ {
+ for (MessageFilter filter : _filters.values())
+ {
+ if (!filter.matches(msg))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public Iterator<MessageFilter> filters()
+ {
+ return _filters.values().iterator();
+ }
+
+ public boolean hasFilters()
+ {
+ return !_filters.isEmpty();
+ }
+
+ public boolean hasFilter(final String name)
+ {
+ return _filters.containsKey(name);
+ }
- boolean allAllow(Filterable msg);
+ @Override
+ public String toString()
+ {
+ return _filters.toString();
+ }
- Iterator<MessageFilter> filters();
- boolean hasFilters();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
index a159a8506b..28f7fe3554 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.filter;
+import java.util.Map;
+
import org.apache.log4j.Logger;
import org.apache.qpid.common.AMQPFilterTypes;
@@ -27,8 +29,6 @@ import org.apache.qpid.filter.SelectorParsingException;
import org.apache.qpid.filter.selector.ParseException;
import org.apache.qpid.filter.selector.TokenMgrError;
-import java.util.Map;
-
public class FilterManagerFactory
{
@@ -54,20 +54,13 @@ public class FilterManagerFactory
if (selector instanceof String && !selector.equals(""))
{
- manager = new SimpleFilterManager();
+ manager = new FilterManager();
try
{
- manager.add(new JMSSelectorFilter((String)selector));
- }
- catch (ParseException e)
- {
- throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selector + "\"", e);
- }
- catch (SelectorParsingException e)
- {
- throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selector + "\"", e);
+ MessageFilter filter = new JMSSelectorFilter((String)selector);
+ manager.add(filter.getName(), filter);
}
- catch (TokenMgrError e)
+ catch (ParseException | SelectorParsingException | TokenMgrError e)
{
throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selector + "\"", e);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
index d0b1670a45..6b8ae2f552 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
@@ -26,12 +26,14 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.WeakHashMap;
+
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.filter.SelectorParsingException;
import org.apache.qpid.filter.selector.ParseException;
import org.apache.qpid.filter.selector.TokenMgrError;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.plugin.PluggableService;
import org.apache.qpid.server.queue.AMQQueue;
public class FilterSupport
@@ -57,15 +59,7 @@ public class FilterSupport
{
selector = new JMSSelectorFilter(selectorString);
}
- catch (ParseException e)
- {
- throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
- }
- catch (SelectorParsingException e)
- {
- throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
- }
- catch (TokenMgrError e)
+ catch (ParseException | SelectorParsingException | TokenMgrError e)
{
throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
}
@@ -119,6 +113,7 @@ public class FilterSupport
}
}
+ @PluggableService
public static final class NoLocalFilter implements MessageFilter
{
private final MessageSource _queue;
@@ -128,6 +123,12 @@ public class FilterSupport
_queue = queue;
}
+ @Override
+ public String getName()
+ {
+ return AMQPFilterTypes.NO_LOCAL.toString();
+ }
+
public boolean matches(Filterable message)
{
@@ -165,6 +166,8 @@ public class FilterSupport
{
return _queue != null ? _queue.hashCode() : 0;
}
+
+
}
static final class CompoundFilter implements MessageFilter
@@ -178,6 +181,12 @@ public class FilterSupport
_jmsSelectorFilter = jmsSelectorFilter;
}
+ @Override
+ public String getName()
+ {
+ return "";
+ }
+
public boolean matches(Filterable message)
{
return _noLocalFilter.matches(message) && _jmsSelectorFilter.matches(message);
@@ -216,5 +225,7 @@ public class FilterSupport
result = 31 * result + (_jmsSelectorFilter != null ? _jmsSelectorFilter.hashCode() : 0);
return result;
}
+
+
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
index 589e888059..295f9ae074 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
@@ -34,6 +34,10 @@ public interface Filterable
Object getConnectionReference();
+ long getMessageNumber();
+
+ long getArrivalTime();
+
public class Factory
{
@@ -41,6 +45,7 @@ public interface Filterable
{
return new Filterable()
{
+
@Override
public AMQMessageHeader getMessageHeader()
{
@@ -64,6 +69,18 @@ public interface Filterable
{
return message.getConnectionReference();
}
+
+ @Override
+ public long getMessageNumber()
+ {
+ return message.getMessageNumber();
+ }
+
+ @Override
+ public long getArrivalTime()
+ {
+ return message.getArrivalTime();
+ }
};
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
index 744e4e4e9d..a36049cd23 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
@@ -25,14 +25,18 @@ import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
import org.apache.log4j.Logger;
+
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.filter.BooleanExpression;
import org.apache.qpid.filter.FilterableMessage;
import org.apache.qpid.filter.SelectorParsingException;
import org.apache.qpid.filter.selector.ParseException;
import org.apache.qpid.filter.selector.SelectorParser;
import org.apache.qpid.filter.selector.TokenMgrError;
+import org.apache.qpid.server.plugin.PluggableService;
+@PluggableService
public class JMSSelectorFilter implements MessageFilter
{
private final static Logger _logger = org.apache.log4j.Logger.getLogger(JMSSelectorFilter.class);
@@ -46,6 +50,12 @@ public class JMSSelectorFilter implements MessageFilter
_matcher = new SelectorParser().parse(selector);
}
+ @Override
+ public String getName()
+ {
+ return AMQPFilterTypes.JMS_SELECTOR.toString();
+ }
+
public boolean matches(Filterable message)
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilterFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilterFactory.java
new file mode 100644
index 0000000000..683906dc88
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilterFactory.java
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.filter;
+
+import java.util.List;
+
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.filter.SelectorParsingException;
+import org.apache.qpid.filter.selector.ParseException;
+import org.apache.qpid.filter.selector.TokenMgrError;
+import org.apache.qpid.server.plugin.MessageFilterFactory;
+import org.apache.qpid.server.plugin.PluggableService;
+
+@PluggableService
+public final class JMSSelectorFilterFactory implements MessageFilterFactory
+{
+ public String getType()
+ {
+ return AMQPFilterTypes.JMS_SELECTOR.toString();
+ }
+
+ @Override
+ public MessageFilter newInstance(final List<Object> arguments)
+ {
+ if(arguments == null || arguments.size() != 1)
+ {
+ throw new IllegalArgumentException("Cannot create a filter from these arguments: " + arguments);
+ }
+ Object arg = arguments.get(0);
+ try
+ {
+ return new JMSSelectorFilter(String.valueOf(arg));
+ }
+ catch (ParseException | TokenMgrError | SelectorParsingException e)
+ {
+ throw new IllegalArgumentException("Cannot create an JMS Selector from '" + arg + "'", e);
+ }
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
index d7dbbea166..226d646efd 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
@@ -22,5 +22,6 @@ package org.apache.qpid.server.filter;
public interface MessageFilter
{
+ String getName();
boolean matches(Filterable message);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
deleted file mode 100644
index 111fb6a333..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-package org.apache.qpid.server.filter;
-
-import org.apache.log4j.Logger;
-
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-public class SimpleFilterManager implements FilterManager
-{
- private final Logger _logger = Logger.getLogger(SimpleFilterManager.class);
-
- private final ConcurrentLinkedQueue<MessageFilter> _filters;
- private String _toString = "";
-
- public SimpleFilterManager()
- {
- _logger.debug("Creating SimpleFilterManager");
- _filters = new ConcurrentLinkedQueue<MessageFilter>();
- }
-
- public SimpleFilterManager(JMSSelectorFilter messageFilter)
- {
- this();
- add(messageFilter);
- }
-
- public void add(MessageFilter filter)
- {
- _filters.add(filter);
- updateStringValue();
- }
-
- public void remove(MessageFilter filter)
- {
- _filters.remove(filter);
- updateStringValue();
- }
-
- public boolean allAllow(Filterable msg)
- {
- for (MessageFilter filter : _filters)
- {
- if (!filter.matches(msg))
- {
- return false;
- }
- }
- return true;
- }
-
- @Override
- public Iterator<MessageFilter> filters()
- {
- return _filters.iterator();
- }
-
- public boolean hasFilters()
- {
- return !_filters.isEmpty();
- }
-
-
- @Override
- public String toString()
- {
- return _toString;
- }
-
- private void updateStringValue()
- {
- StringBuilder toString = new StringBuilder();
- for (MessageFilter filter : _filters)
- {
- toString.append(filter.toString());
- toString.append(",");
- }
-
- if (_filters.size() > 0)
- {
- //Remove the last ','
- toString.deleteCharAt(toString.length()-1);
- }
- _toString = toString.toString();
- }
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index 46fbaac3f2..b69c17dca0 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -21,6 +21,8 @@
package org.apache.qpid.server.model;
import java.util.Collection;
+import java.util.List;
+import java.util.Map;
import org.apache.qpid.server.queue.QueueEntryVisitor;
import org.apache.qpid.server.store.MessageDurability;
@@ -48,6 +50,7 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>
String QUEUE_FLOW_STOPPED = "queueFlowStopped";
String MAXIMUM_MESSAGE_TTL = "maximumMessageTtl";
String MINIMUM_MESSAGE_TTL = "minimumMessageTtl";
+ String DEFAULT_FILTERS = "defaultFilters";
String QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT = "queue.minimumEstimatedMemoryFootprint";
@ManagedContextDefault( name = QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT)
@@ -155,6 +158,9 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>
@ManagedAttribute
long getMaximumMessageTtl();
+ @ManagedAttribute
+ Map<String, Map<String,List<Object>>> getDefaultFilters();
+
//children
Collection<? extends Binding> getBindings();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFilterFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFilterFactory.java
new file mode 100644
index 0000000000..9c76f5590e
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFilterFactory.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.plugin;
+
+import java.util.List;
+
+import org.apache.qpid.server.filter.MessageFilter;
+
+public interface MessageFilterFactory extends Pluggable
+{
+ MessageFilter newInstance(List<Object> arguments);
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/QpidServiceLoader.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/QpidServiceLoader.java
index f70afb12ba..e6100efda7 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/QpidServiceLoader.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/QpidServiceLoader.java
@@ -19,8 +19,11 @@
package org.apache.qpid.server.plugin;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.ServiceLoader;
import org.apache.log4j.Logger;
@@ -47,6 +50,16 @@ public class QpidServiceLoader
return instancesOf(clazz, true);
}
+ public <C extends Pluggable> Map<String,C> getInstancesByType(Class<C> clazz)
+ {
+ Map<String,C> instances = new HashMap<>();
+ for(C instance : instancesOf(clazz))
+ {
+ instances.put(instance.getType(), instance);
+ }
+ return Collections.unmodifiableMap(instances);
+ }
+
private <C extends Pluggable> Iterable<C> instancesOf(Class<C> clazz, boolean atLeastOne)
{
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index f3711ee53a..02798e9834 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -30,6 +30,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -51,6 +52,7 @@ import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
@@ -74,6 +76,8 @@ import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.QueueNotificationListener;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.StateTransition;
+import org.apache.qpid.server.plugin.MessageFilterFactory;
+import org.apache.qpid.server.plugin.QpidServiceLoader;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.SecurityManager;
@@ -185,6 +189,9 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
@ManagedAttributeField
private MessageDurability _messageDurability;
+ @ManagedAttributeField
+ private Map<String, Map<String,List<Object>>> _defaultFilters;
+
private Object _exclusiveOwner; // could be connection, session, Principal or a String for the container name
private final Set<NotificationCheck> _notificationChecks =
@@ -246,6 +253,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
private final QueueRunner _queueRunner = new QueueRunner(this);
private boolean _closing;
+ private final ConcurrentHashMap<String,MessageFilter> _defaultFiltersMap = new ConcurrentHashMap<>();
protected AbstractQueue(Map<String, Object> attributes, VirtualHostImpl virtualHost)
{
@@ -449,6 +457,40 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
_maxAsyncDeliveries = getContextValue(Integer.class, Queue.MAX_ASYNCHRONOUS_DELIVERIES);
+
+ if(_defaultFilters != null)
+ {
+ QpidServiceLoader qpidServiceLoader = new QpidServiceLoader();
+ final Map<String, MessageFilterFactory> messageFilterFactories =
+ qpidServiceLoader.getInstancesByType(MessageFilterFactory.class);
+
+ for (Map.Entry<String,Map<String,List<Object>>> entry : _defaultFilters.entrySet())
+ {
+ String name = String.valueOf(entry.getKey());
+ Map<String, List<Object>> filterValue = entry.getValue();
+ if(filterValue.size() == 1)
+ {
+ String filterTypeName = String.valueOf(filterValue.keySet().iterator().next());
+ MessageFilterFactory filterFactory = messageFilterFactories.get(filterTypeName);
+ if(filterFactory != null)
+ {
+ List<Object> filterArguments = filterValue.values().iterator().next();
+ _defaultFiltersMap.put(name, filterFactory.newInstance(filterArguments));
+ }
+ else
+ {
+ throw new IllegalArgumentException("Unknown filter type " + filterTypeName + ", known types are: " + messageFilterFactories.keySet());
+ }
+ }
+ else
+ {
+ throw new IllegalArgumentException("Filter value should be a map with one entry, having the type as key and the value being the filter arguments, not " + filterValue);
+
+ }
+
+ }
+ }
+
updateAlertChecks();
}
@@ -554,6 +596,12 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
@Override
+ public Map<String, Map<String, List<Object>>> getDefaultFilters()
+ {
+ return _defaultFilters;
+ }
+
+ @Override
public final MessageDurability getMessageDurability()
{
return _messageDurability;
@@ -602,7 +650,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
@Override
public synchronized QueueConsumerImpl addConsumer(final ConsumerTarget target,
- final FilterManager filters,
+ FilterManager filters,
final Class<? extends ServerMessage> messageClass,
final String consumerName,
EnumSet<ConsumerImpl.Option> optionSet)
@@ -698,7 +746,20 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
{
throw new ExistingConsumerPreventsExclusive();
}
-
+ if(!_defaultFiltersMap.isEmpty())
+ {
+ if(filters == null)
+ {
+ filters = new FilterManager();
+ }
+ for (Map.Entry<String,MessageFilter> filter : _defaultFiltersMap.entrySet())
+ {
+ if(!filters.hasFilter(filter.getKey()))
+ {
+ filters.add(filter.getKey(), filter.getValue());
+ }
+ }
+ }
QueueConsumerImpl consumer = new QueueConsumerImpl(this,
target,
consumerName,
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
index 49732e8345..ae1fe12c92 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
@@ -62,6 +62,9 @@ public class QueueArgumentsConverter
public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue";
+ public static final String QPID_DEFAULT_FILTERS = "qpid.default_filters";
+
+
/**
* No-local queue argument is used to support the no-local feature of Durable Subscribers.
*/
@@ -99,6 +102,7 @@ public class QueueArgumentsConverter
ATTRIBUTE_MAPPINGS.put(QPID_NO_LOCAL, Queue.NO_LOCAL);
ATTRIBUTE_MAPPINGS.put(QPID_MESSAGE_DURABILITY, Queue.MESSAGE_DURABILITY);
+ ATTRIBUTE_MAPPINGS.put(QPID_DEFAULT_FILTERS, Queue.DEFAULT_FILTERS);
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
index 1c42d9b6fe..497a66ab5e 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
@@ -38,7 +38,6 @@ import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.filter.MessageFilter;
-import org.apache.qpid.server.filter.SimpleFilterManager;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
@@ -103,16 +102,23 @@ public class MockConsumer implements ConsumerTarget
{
if(_messageIds != null)
{
- SimpleFilterManager filters = new SimpleFilterManager();
- filters.add(new MessageFilter()
+ FilterManager filters = new FilterManager();
+ MessageFilter filter = new MessageFilter()
{
@Override
+ public String getName()
+ {
+ return "";
+ }
+
+ @Override
public boolean matches(final Filterable message)
{
final String messageId = message.getMessageHeader().getMessageId();
return _messageIds.contains(messageId);
}
- });
+ };
+ filters.add(filter.getName(), filter);
return filters;
}
else
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 288a4f946c..8632d04048 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -33,13 +33,16 @@ import java.util.UUID;
import org.apache.log4j.Logger;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
+import org.apache.qpid.server.filter.ArrivalTimeFilter;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
+import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.message.InstanceProperties;
@@ -257,6 +260,43 @@ public class ServerSessionDelegate extends SessionDelegate
return;
}
+
+ if(method.hasArguments() && method.getArguments().containsKey(AMQPFilterTypes.REPLAY_PERIOD.toString()))
+ {
+ Object value = method.getArguments().get(AMQPFilterTypes.REPLAY_PERIOD.toString());
+ final long period;
+ if(value instanceof Number)
+ {
+ period = ((Number)value).longValue();
+ }
+ else if(value instanceof String)
+ {
+ try
+ {
+ period = Long.parseLong(value.toString());
+ }
+ catch (NumberFormatException e)
+ {
+ exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "Cannot parse value " + value + " as a number for filter " + AMQPFilterTypes.REPLAY_PERIOD.toString());
+ return;
+ }
+ }
+ else
+ {
+ exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "Cannot parse value " + value + " as a number for filter " + AMQPFilterTypes.REPLAY_PERIOD.toString());
+ return;
+ }
+ final long startingFrom = System.currentTimeMillis() - (1000l * period);
+ if(filterManager == null)
+ {
+ filterManager = new FilterManager();
+ }
+ MessageFilter filter = new ArrivalTimeFilter(startingFrom);
+ filterManager.add(filter.getName(), filter);
+
+ }
+
+
ConsumerTarget_0_10 target = new ConsumerTarget_0_10((ServerSession)session, destination,
method.getAcceptMode(),
method.getAcquireMode(),
@@ -1596,4 +1636,5 @@ public class ServerSessionDelegate extends SessionDelegate
{
}
}
+
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 6108de2fda..9afa7c393f 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -60,11 +60,11 @@ import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
+import org.apache.qpid.server.filter.ArrivalTimeFilter;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.filter.MessageFilter;
-import org.apache.qpid.server.filter.SimpleFilterManager;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.MessageOnlyCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
@@ -670,7 +670,7 @@ public class AMQChannel
* @param tag the tag chosen by the client (if null, server will generate one)
* @param sources the queues to subscribe to
* @param acks Are acks enabled for this subscriber
- * @param filters Filters to apply to this subscriber
+ * @param arguments Filters to apply to this subscriber
*
* @param exclusive Flag requesting exclusive access to the queue
* @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests
@@ -678,7 +678,7 @@ public class AMQChannel
* @throws org.apache.qpid.AMQException if something goes wrong
*/
public AMQShortString consumeFromSource(AMQShortString tag, Collection<MessageSource> sources, boolean acks,
- FieldTable filters, boolean exclusive, boolean noLocal)
+ FieldTable arguments, boolean exclusive, boolean noLocal)
throws MessageSource.ExistingConsumerPreventsExclusive,
MessageSource.ExistingExclusiveConsumer,
AMQInvalidArgumentException,
@@ -697,19 +697,19 @@ public class AMQChannel
ConsumerTarget_0_8 target;
EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class);
- if(filters != null && Boolean.TRUE.equals(filters.get(AMQPFilterTypes.NO_CONSUME.getValue())))
+ if(arguments != null && Boolean.TRUE.equals(arguments.get(AMQPFilterTypes.NO_CONSUME.getValue())))
{
- target = ConsumerTarget_0_8.createBrowserTarget(this, tag, filters, _creditManager);
+ target = ConsumerTarget_0_8.createBrowserTarget(this, tag, arguments, _creditManager);
}
else if(acks)
{
- target = ConsumerTarget_0_8.createAckTarget(this, tag, filters, _creditManager);
+ target = ConsumerTarget_0_8.createAckTarget(this, tag, arguments, _creditManager);
options.add(ConsumerImpl.Option.ACQUIRES);
options.add(ConsumerImpl.Option.SEES_REQUEUES);
}
else
{
- target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, _creditManager);
+ target = ConsumerTarget_0_8.createNoAckTarget(this, tag, arguments, _creditManager);
options.add(ConsumerImpl.Option.ACQUIRES);
options.add(ConsumerImpl.Option.SEES_REQUEUES);
}
@@ -729,23 +729,66 @@ public class AMQChannel
try
{
- FilterManager filterManager = FilterManagerFactory.createManager(FieldTable.convertToMap(filters));
+ FilterManager filterManager = FilterManagerFactory.createManager(FieldTable.convertToMap(arguments));
if(noLocal)
{
if(filterManager == null)
{
- filterManager = new SimpleFilterManager();
+ filterManager = new FilterManager();
}
final Object connectionReference = getConnectionReference();
- filterManager.add(new MessageFilter()
+ MessageFilter filter = new MessageFilter()
{
+
+ @Override
+ public String getName()
+ {
+ return AMQPFilterTypes.NO_LOCAL.toString();
+ }
+
@Override
public boolean matches(final Filterable message)
{
return message.getConnectionReference() != connectionReference;
}
- });
+ };
+ filterManager.add(filter.getName(), filter);
+ }
+
+ if(arguments != null && arguments.containsKey(AMQPFilterTypes.REPLAY_PERIOD.toString()))
+ {
+ Object value = arguments.get(AMQPFilterTypes.REPLAY_PERIOD.toString());
+ final long period;
+ if(value instanceof Number)
+ {
+ period = ((Number)value).longValue();
+ }
+ else if(value instanceof String)
+ {
+ try
+ {
+ period = Long.parseLong(value.toString());
+ }
+ catch (NumberFormatException e)
+ {
+ throw new AMQInvalidArgumentException("Cannot parse value " + value + " as a number for filter " + AMQPFilterTypes.REPLAY_PERIOD.toString());
+ }
+ }
+ else
+ {
+ throw new AMQInvalidArgumentException("Cannot parse value " + value + " as a number for filter " + AMQPFilterTypes.REPLAY_PERIOD.toString());
+ }
+
+ final long startingFrom = System.currentTimeMillis() - (1000l * period);
+ if(filterManager == null)
+ {
+ filterManager = new FilterManager();
+ }
+ MessageFilter filter = new ArrivalTimeFilter(startingFrom);
+ filterManager.add(filter.getName(), filter);
+
}
+
for(MessageSource source : sources)
{
ConsumerImpl sub =
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index 85a0b559c9..c952a3c585 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -65,8 +65,8 @@ import org.apache.qpid.filter.selector.ParseException;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.JMSSelectorFilter;
-import org.apache.qpid.server.filter.SimpleFilterManager;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.ExclusivityPolicy;
@@ -154,15 +154,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
actualFilters.put(entry.getKey(), entry.getValue());
}
- catch (ParseException e)
- {
- Error error = new Error();
- error.setCondition(AmqpError.INVALID_FIELD);
- error.setDescription("Invalid JMS Selector: " + selectorFilter.getValue());
- error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter")));
- throw new AmqpErrorException(error);
- }
- catch (SelectorParsingException e)
+ catch (ParseException | SelectorParsingException e)
{
Error error = new Error();
error.setCondition(AmqpError.INVALID_FIELD);
@@ -374,8 +366,16 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
{
name = getEndpoint().getName();
}
+
+ FilterManager filters = null;
+ if(messageFilter != null)
+ {
+ filters = new FilterManager();
+ filters.add(messageFilter.getName(), messageFilter);
+ }
+
_consumer = _queue.addConsumer(_target,
- messageFilter == null ? null : new SimpleFilterManager(messageFilter),
+ filters,
Message_1_0.class, name, options);
}
catch (MessageSource.ExistingExclusiveConsumer e)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index a71555480f..2eeea4c967 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -89,6 +89,8 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte
private RejectBehaviour _rejectBehaviour;
+ private Map<String,Object> _consumerArguments;
+
public static final int QUEUE_TYPE = 1;
public static final int TOPIC_TYPE = 2;
public static final int UNKNOWN_TYPE = 3;
@@ -299,6 +301,7 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte
_bindingKeys = binding.getBindingKeys() == null || binding.getBindingKeys().length == 0 ? new AMQShortString[0] : binding.getBindingKeys();
final String rejectBehaviourValue = binding.getOption(BindingURL.OPTION_REJECT_BEHAVIOUR);
_rejectBehaviour = rejectBehaviourValue == null ? null : RejectBehaviour.valueOf(rejectBehaviourValue.toUpperCase());
+ _consumerArguments = binding.getConsumerOptions();
}
protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, AMQShortString queueName)
@@ -718,6 +721,11 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte
return result;
}
+ public Map<String, Object> getConsumerArguments()
+ {
+ return _consumerArguments;
+ }
+
public Reference getReference() throws NamingException
{
return new Reference(
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 9cef1f8dce..3c947043c6 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -187,6 +187,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
}
final FieldTable ft = FieldTableFactory.newFieldTable();
+ if(destination.getConsumerArguments() != null)
+ {
+ ft.addAll(FieldTable.convertToFieldTable(destination.getConsumerArguments()));
+ }
// rawSelector is used by HeadersExchange and is not a JMS Selector
if (rawSelector != null)
{
@@ -203,6 +207,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
ft.put(AMQPFilterTypes.NO_LOCAL.getValue(), noLocal);
}
+
_arguments = ft;
_addressType = _destination.getAddressType();
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index 1d7bb6087a..4f2715bd7b 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -69,6 +69,7 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
consumerArguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
}
+
_topicDestinationCache = session.getTopicDestinationCache();
_queueDestinationCache = session.getQueueDestinationCache();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java b/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java
index 483fbaea50..d033bf86c2 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java
@@ -30,7 +30,8 @@ public enum AMQPFilterTypes
JMS_SELECTOR("x-filter-jms-selector"),
NO_CONSUME("x-filter-no-consume"),
AUTO_CLOSE("x-filter-auto-close"),
- NO_LOCAL("x-qpid-no-local");
+ NO_LOCAL("x-qpid-no-local"),
+ REPLAY_PERIOD("x-qpid-replay-period");
/** The identifying string for the filter type. */
private final AMQShortString _value;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java b/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java
index 77902c3531..3ea99ce4ab 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java
@@ -20,15 +20,16 @@
*/
package org.apache.qpid.url;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import java.net.URISyntaxException;
-import java.util.HashMap;
-
public class AMQBindingURL implements BindingURL
{
private static final Logger _logger = LoggerFactory.getLogger(AMQBindingURL.class);
@@ -135,6 +136,20 @@ public class AMQBindingURL implements BindingURL
return _options.get(key);
}
+ @Override
+ public Map<String,Object> getConsumerOptions()
+ {
+ Map<String,Object> options = new HashMap<>();
+ for(Map.Entry<String,String> option : _options.entrySet())
+ {
+ if(!NON_CONSUMER_OPTIONS.contains(option.getKey()))
+ {
+ options.put(option.getKey(), option.getValue());
+ }
+ }
+ return options;
+ }
+
public void setOption(String key, String value)
{
_options.put(key, value);
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java b/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
index 80a1ae540b..91f80ff88c 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
@@ -20,6 +20,12 @@
*/
package org.apache.qpid.url;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
import org.apache.qpid.framing.AMQShortString;
/*
@@ -47,6 +53,18 @@ public interface BindingURL
*/
public static final String OPTION_REJECT_BEHAVIOUR = "rejectbehaviour";
+ public static final Set<String> NON_CONSUMER_OPTIONS =
+ Collections.unmodifiableSet(new HashSet<String>(Arrays.asList(OPTION_EXCLUSIVE,
+ OPTION_AUTODELETE,
+ OPTION_DURABLE,
+ OPTION_BROWSE,
+ OPTION_ROUTING_KEY,
+ OPTION_BINDING_KEY,
+ OPTION_EXCHANGE_AUTODELETE,
+ OPTION_EXCHANGE_DURABLE,
+ OPTION_EXCHANGE_DURABLE,
+ OPTION_REJECT_BEHAVIOUR)));
+
String getURL();
@@ -60,6 +78,9 @@ public interface BindingURL
String getOption(String key);
+ Map<String,Object> getConsumerOptions();
+
+
boolean containsOption(String key);
AMQShortString getRoutingKey();
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
index e609d73268..ddf13bdc37 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
@@ -126,7 +126,8 @@ public class Asserts
Queue.MESSAGE_GROUP_SHARED_GROUPS,
PriorityQueue.PRIORITIES,
ConfiguredObject.CONTEXT,
- ConfiguredObject.DESIRED_STATE);
+ ConfiguredObject.DESIRED_STATE,
+ Queue.DEFAULT_FILTERS);
assertEquals("Unexpected value of queue attribute " + Queue.NAME, queueName, queueData.get(Queue.NAME));
assertNotNull("Unexpected value of queue attribute " + Queue.ID, queueData.get(Queue.ID));
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ArrivalTimeFilterTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ArrivalTimeFilterTest.java
new file mode 100644
index 0000000000..9a5982e65d
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ArrivalTimeFilterTest.java
@@ -0,0 +1,107 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class ArrivalTimeFilterTest extends QpidBrokerTestCase
+{
+
+ private String _queueName;
+ private Connection _connection;
+ private Session _session;
+ private Queue _queue;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ _queueName = getTestQueueName();
+ _connection = getConnection();
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _connection.start();
+ }
+
+
+ public void testArrivalTime0() throws AMQException, JMSException, InterruptedException
+ {
+ createDestinationWithFilter(0);
+ final MessageProducer prod = _session.createProducer(_queue);
+ TextMessage textMessage = _session.createTextMessage("hello");
+ prod.send(textMessage);
+
+ Thread.sleep(100);
+
+ MessageConsumer cons = _session.createConsumer(_queue);
+
+ assertNull("Message should not be received", cons.receive(500));
+
+ textMessage = _session.createTextMessage("hello");
+ prod.send( textMessage);
+
+ Message receivedMsg = cons.receive(500);
+ assertNotNull("Message should be received", receivedMsg);
+ }
+
+
+ public void testArrivalTime1000() throws AMQException, JMSException, InterruptedException
+ {
+ createDestinationWithFilter(1000);
+ final MessageProducer prod = _session.createProducer(_queue);
+ TextMessage textMessage = _session.createTextMessage("hello");
+ prod.send(textMessage);
+
+ Thread.sleep(100);
+
+ MessageConsumer cons = _session.createConsumer(_queue);
+
+ assertNotNull("Message should be received", cons.receive(500));
+
+ textMessage = _session.createTextMessage("hello");
+ prod.send( textMessage);
+
+ Message receivedMsg = cons.receive(500);
+ assertNotNull("Message should be received", receivedMsg);
+ }
+
+ private void createDestinationWithFilter(final int period) throws AMQException, JMSException
+ {
+ ((AMQSession<?,?>) _session).createQueue(new AMQShortString(_queueName), false, true, false, null);
+ Queue queue = new org.apache.qpid.client.AMQQueue("amq.direct", _queueName);
+ ((AMQSession<?,?>) _session).declareAndBind((AMQDestination)queue);
+ _queue = _session.createQueue("direct://amq.direct/"+_queueName+"/"+_queueName+"?x-qpid-replay-period='"+period+"0'");
+ }
+
+
+}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/DefaultFiltersTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/DefaultFiltersTest.java
new file mode 100644
index 0000000000..b676e653ac
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/DefaultFiltersTest.java
@@ -0,0 +1,116 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class DefaultFiltersTest extends QpidBrokerTestCase
+{
+
+ private String _queueName;
+ private Connection _connection;
+ private Session _session;
+ private Queue _queue;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ _queueName = getTestQueueName();
+ _connection = getConnection();
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _connection.start();
+ }
+
+ private void createQueueWithDefaultFilter(String selector) throws AMQException
+ {
+ final Map<String,Object> arguments = new HashMap<>();
+ selector = selector.replace("\\", "\\\\");
+ selector = selector.replace("\"", "\\\"");
+
+ arguments.put("qpid.default_filters","{ \"x-filter-jms-selector\" : { \"x-filter-jms-selector\" : [ \""+selector+"\" ] } }");
+ ((AMQSession<?,?>) _session).createQueue(new AMQShortString(_queueName), false, true, false, arguments);
+ _queue = new org.apache.qpid.client.AMQQueue("amq.direct", _queueName);
+ ((AMQSession<?,?>) _session).declareAndBind((AMQDestination)_queue);
+ }
+
+ public void testDefaultFilterIsApplied() throws AMQException, JMSException
+ {
+ createQueueWithDefaultFilter("foo = 1");
+ final MessageProducer prod = _session.createProducer(_queue);
+ TextMessage textMessage = _session.createTextMessage("hello");
+ textMessage.setIntProperty("foo", 0);
+ prod.send(textMessage);
+
+ MessageConsumer cons = _session.createConsumer(_queue);
+
+ assertNull("Message with foo=0 should not be received", cons.receive(500));
+
+ textMessage = _session.createTextMessage("hello");
+ textMessage.setIntProperty("foo", 1);
+ prod.send( textMessage);
+
+ Message receivedMsg = cons.receive(500);
+ assertNotNull("Message with foo=1 should be received", receivedMsg);
+ assertEquals("Property foo not as expected", 1, receivedMsg.getIntProperty("foo"));
+ }
+
+
+ public void testDefaultFilterIsOverridden() throws AMQException, JMSException
+ {
+ createQueueWithDefaultFilter("foo = 1");
+ final MessageProducer prod = _session.createProducer(_queue);
+ TextMessage textMessage = _session.createTextMessage("hello");
+ textMessage.setIntProperty("foo", 0);
+ prod.send(textMessage);
+
+ MessageConsumer cons = _session.createConsumer(_queue, "foo = 0");
+
+ Message receivedMsg = cons.receive(500);
+ assertNotNull("Message with foo=0 should be received", receivedMsg);
+ assertEquals("Property foo not as expected", 0, receivedMsg.getIntProperty("foo"));
+
+
+ textMessage = _session.createTextMessage("hello");
+ textMessage.setIntProperty("foo", 1);
+ prod.send(textMessage);
+
+ assertNull("Message with foo=1 should not be received", cons.receive(500));
+
+ }
+
+}
diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes
index f01e245560..249244a3ae 100755
--- a/qpid/java/test-profiles/CPPExcludes
+++ b/qpid/java/test-profiles/CPPExcludes
@@ -209,3 +209,7 @@ org.apache.qpid.test.unit.client.AMQSessionTest#testQueueDepthForQueueThatDoesNo
org.apache.qpid.client.prefetch.PrefetchBehaviourTest#testPrefetchWindowExpandsOnReceiveTransaction
org.apache.qpid.client.SyncPublishTest#*
+
+org.apache.qpid.server.queue.ArrivalTimeFilterTest#*
+org.apache.qpid.server.queue.DefaultFiltersTest#*
+