summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/filter
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/filter')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java275
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java106
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java38
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java599
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java209
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java36
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java37
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java75
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java63
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java120
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java28
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java48
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java235
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java94
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java362
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java126
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java57
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java101
18 files changed, 2609 insertions, 0 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java
new file mode 100644
index 0000000000..221d23ef0d
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java
@@ -0,0 +1,275 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.Filterable;
+
+/**
+ * An expression which performs an operation on two expression values
+ */
+public abstract class ArithmeticExpression extends BinaryExpression
+{
+
+ protected static final int INTEGER = 1;
+ protected static final int LONG = 2;
+ protected static final int DOUBLE = 3;
+
+ /**
+ * @param left
+ * @param right
+ */
+ public ArithmeticExpression(Expression left, Expression right)
+ {
+ super(left, right);
+ }
+
+ public static Expression createPlus(Expression left, Expression right)
+ {
+ return new ArithmeticExpression(left, right)
+ {
+ protected Object evaluate(Object lvalue, Object rvalue)
+ {
+ if (lvalue instanceof String)
+ {
+ String text = (String) lvalue;
+ String answer = text + rvalue;
+
+ return answer;
+ }
+ else if (lvalue instanceof Number)
+ {
+ return plus((Number) lvalue, asNumber(rvalue));
+ }
+
+ throw new RuntimeException("Cannot call plus operation on: " + lvalue + " and: " + rvalue);
+ }
+
+ public String getExpressionSymbol()
+ {
+ return "+";
+ }
+ };
+ }
+
+ public static Expression createMinus(Expression left, Expression right)
+ {
+ return new ArithmeticExpression(left, right)
+ {
+ protected Object evaluate(Object lvalue, Object rvalue)
+ {
+ if (lvalue instanceof Number)
+ {
+ return minus((Number) lvalue, asNumber(rvalue));
+ }
+
+ throw new RuntimeException("Cannot call minus operation on: " + lvalue + " and: " + rvalue);
+ }
+
+ public String getExpressionSymbol()
+ {
+ return "-";
+ }
+ };
+ }
+
+ public static Expression createMultiply(Expression left, Expression right)
+ {
+ return new ArithmeticExpression(left, right)
+ {
+
+ protected Object evaluate(Object lvalue, Object rvalue)
+ {
+ if (lvalue instanceof Number)
+ {
+ return multiply((Number) lvalue, asNumber(rvalue));
+ }
+
+ throw new RuntimeException("Cannot call multiply operation on: " + lvalue + " and: " + rvalue);
+ }
+
+ public String getExpressionSymbol()
+ {
+ return "*";
+ }
+ };
+ }
+
+ public static Expression createDivide(Expression left, Expression right)
+ {
+ return new ArithmeticExpression(left, right)
+ {
+
+ protected Object evaluate(Object lvalue, Object rvalue)
+ {
+ if (lvalue instanceof Number)
+ {
+ return divide((Number) lvalue, asNumber(rvalue));
+ }
+
+ throw new RuntimeException("Cannot call divide operation on: " + lvalue + " and: " + rvalue);
+ }
+
+ public String getExpressionSymbol()
+ {
+ return "/";
+ }
+ };
+ }
+
+ public static Expression createMod(Expression left, Expression right)
+ {
+ return new ArithmeticExpression(left, right)
+ {
+
+ protected Object evaluate(Object lvalue, Object rvalue)
+ {
+ if (lvalue instanceof Number)
+ {
+ return mod((Number) lvalue, asNumber(rvalue));
+ }
+
+ throw new RuntimeException("Cannot call mod operation on: " + lvalue + " and: " + rvalue);
+ }
+
+ public String getExpressionSymbol()
+ {
+ return "%";
+ }
+ };
+ }
+
+ protected Number plus(Number left, Number right)
+ {
+ switch (numberType(left, right))
+ {
+
+ case INTEGER:
+ return Integer.valueOf(left.intValue() + right.intValue());
+
+ case LONG:
+ return Long.valueOf(left.longValue() + right.longValue());
+
+ default:
+ return Double.valueOf(left.doubleValue() + right.doubleValue());
+ }
+ }
+
+ protected Number minus(Number left, Number right)
+ {
+ switch (numberType(left, right))
+ {
+
+ case INTEGER:
+ return Integer.valueOf(left.intValue() - right.intValue());
+
+ case LONG:
+ return Long.valueOf(left.longValue() - right.longValue());
+
+ default:
+ return Double.valueOf(left.doubleValue() - right.doubleValue());
+ }
+ }
+
+ protected Number multiply(Number left, Number right)
+ {
+ switch (numberType(left, right))
+ {
+
+ case INTEGER:
+ return Integer.valueOf(left.intValue() * right.intValue());
+
+ case LONG:
+ return Long.valueOf(left.longValue() * right.longValue());
+
+ default:
+ return Double.valueOf(left.doubleValue() * right.doubleValue());
+ }
+ }
+
+ protected Number divide(Number left, Number right)
+ {
+ return Double.valueOf(left.doubleValue() / right.doubleValue());
+ }
+
+ protected Number mod(Number left, Number right)
+ {
+ return Double.valueOf(left.doubleValue() % right.doubleValue());
+ }
+
+ private int numberType(Number left, Number right)
+ {
+ if (isDouble(left) || isDouble(right))
+ {
+ return DOUBLE;
+ }
+ else if ((left instanceof Long) || (right instanceof Long))
+ {
+ return LONG;
+ }
+ else
+ {
+ return INTEGER;
+ }
+ }
+
+ private boolean isDouble(Number n)
+ {
+ return (n instanceof Float) || (n instanceof Double);
+ }
+
+ protected Number asNumber(Object value)
+ {
+ if (value instanceof Number)
+ {
+ return (Number) value;
+ }
+ else
+ {
+ throw new RuntimeException("Cannot convert value: " + value + " into a number");
+ }
+ }
+
+ public Object evaluate(Filterable message)
+ {
+ Object lvalue = left.evaluate(message);
+ if (lvalue == null)
+ {
+ return null;
+ }
+
+ Object rvalue = right.evaluate(message);
+ if (rvalue == null)
+ {
+ return null;
+ }
+
+ return evaluate(lvalue, rvalue);
+ }
+
+ /**
+ * @param lvalue
+ * @param rvalue
+ * @return
+ */
+ protected abstract Object evaluate(Object lvalue, Object rvalue);
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java
new file mode 100644
index 0000000000..024257bea9
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java
@@ -0,0 +1,106 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+/**
+ * An expression which performs an operation on two expression values.
+ */
+public abstract class BinaryExpression implements Expression
+{
+ protected Expression left;
+ protected Expression right;
+
+ public BinaryExpression(Expression left, Expression right)
+ {
+ this.left = left;
+ this.right = right;
+ }
+
+ public Expression getLeft()
+ {
+ return left;
+ }
+
+ public Expression getRight()
+ {
+ return right;
+ }
+
+ /**
+ * @see java.lang.Object#toString()
+ */
+ public String toString()
+ {
+ return "(" + left.toString() + " " + getExpressionSymbol() + " " + right.toString() + ")";
+ }
+
+ /**
+ * TODO: more efficient hashCode()
+ *
+ * @see java.lang.Object#hashCode()
+ */
+ public int hashCode()
+ {
+ return toString().hashCode();
+ }
+
+ /**
+ * TODO: more efficient hashCode()
+ *
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ public boolean equals(Object o)
+ {
+
+ if ((o == null) || !this.getClass().equals(o.getClass()))
+ {
+ return false;
+ }
+
+ return toString().equals(o.toString());
+
+ }
+
+ /**
+ * Returns the symbol that represents this binary expression. For example, addition is
+ * represented by "+"
+ *
+ * @return
+ */
+ public abstract String getExpressionSymbol();
+
+ /**
+ * @param expression
+ */
+ public void setRight(Expression expression)
+ {
+ right = expression;
+ }
+
+ /**
+ * @param expression
+ */
+ public void setLeft(Expression expression)
+ {
+ left = expression;
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java
new file mode 100644
index 0000000000..06e8664470
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java
@@ -0,0 +1,38 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import org.apache.qpid.server.queue.Filterable;
+
+/**
+ * A BooleanExpression is an expression that always
+ * produces a Boolean result.
+ */
+public interface BooleanExpression extends Expression
+{
+
+ /**
+ * @param message
+ * @return true if the expression evaluates to Boolean.TRUE.
+ */
+ public boolean matches(Filterable message);
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java
new file mode 100644
index 0000000000..aad9d41174
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java
@@ -0,0 +1,599 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.qpid.server.queue.Filterable;
+
+/**
+ * A filter performing a comparison of two objects
+ */
+public abstract class ComparisonExpression extends BinaryExpression implements BooleanExpression
+{
+
+ public static BooleanExpression createBetween(Expression value, Expression left, Expression right)
+ {
+ return LogicExpression.createAND(createGreaterThanEqual(value, left), createLessThanEqual(value, right));
+ }
+
+ public static BooleanExpression createNotBetween(Expression value, Expression left, Expression right)
+ {
+ return LogicExpression.createOR(createLessThan(value, left), createGreaterThan(value, right));
+ }
+
+ private static final HashSet<Character> REGEXP_CONTROL_CHARS = new HashSet<Character>();
+
+ static
+ {
+ REGEXP_CONTROL_CHARS.add('.');
+ REGEXP_CONTROL_CHARS.add('\\');
+ REGEXP_CONTROL_CHARS.add('[');
+ REGEXP_CONTROL_CHARS.add(']');
+ REGEXP_CONTROL_CHARS.add('^');
+ REGEXP_CONTROL_CHARS.add('$');
+ REGEXP_CONTROL_CHARS.add('?');
+ REGEXP_CONTROL_CHARS.add('*');
+ REGEXP_CONTROL_CHARS.add('+');
+ REGEXP_CONTROL_CHARS.add('{');
+ REGEXP_CONTROL_CHARS.add('}');
+ REGEXP_CONTROL_CHARS.add('|');
+ REGEXP_CONTROL_CHARS.add('(');
+ REGEXP_CONTROL_CHARS.add(')');
+ REGEXP_CONTROL_CHARS.add(':');
+ REGEXP_CONTROL_CHARS.add('&');
+ REGEXP_CONTROL_CHARS.add('<');
+ REGEXP_CONTROL_CHARS.add('>');
+ REGEXP_CONTROL_CHARS.add('=');
+ REGEXP_CONTROL_CHARS.add('!');
+ }
+
+ static class LikeExpression extends UnaryExpression implements BooleanExpression
+ {
+
+ Pattern likePattern;
+
+ /**
+ * @param right
+ */
+ public LikeExpression(Expression right, String like, int escape)
+ {
+ super(right);
+
+ StringBuffer regexp = new StringBuffer(like.length() * 2);
+ regexp.append("\\A"); // The beginning of the input
+ for (int i = 0; i < like.length(); i++)
+ {
+ char c = like.charAt(i);
+ if (escape == (0xFFFF & c))
+ {
+ i++;
+ if (i >= like.length())
+ {
+ // nothing left to escape...
+ break;
+ }
+
+ char t = like.charAt(i);
+ regexp.append("\\x");
+ regexp.append(Integer.toHexString(0xFFFF & t));
+ }
+ else if (c == '%')
+ {
+ regexp.append(".*?"); // Do a non-greedy match
+ }
+ else if (c == '_')
+ {
+ regexp.append("."); // match one
+ }
+ else if (REGEXP_CONTROL_CHARS.contains(c))
+ {
+ regexp.append("\\x");
+ regexp.append(Integer.toHexString(0xFFFF & c));
+ }
+ else
+ {
+ regexp.append(c);
+ }
+ }
+
+ regexp.append("\\z"); // The end of the input
+
+ likePattern = Pattern.compile(regexp.toString(), Pattern.DOTALL);
+ }
+
+ /**
+ * org.apache.activemq.filter.UnaryExpression#getExpressionSymbol()
+ */
+ public String getExpressionSymbol()
+ {
+ return "LIKE";
+ }
+
+ /**
+ * org.apache.activemq.filter.Expression#evaluate(MessageEvaluationContext)
+ */
+ public Object evaluate(Filterable message)
+ {
+
+ Object rv = this.getRight().evaluate(message);
+
+ if (rv == null)
+ {
+ return null;
+ }
+
+ if (!(rv instanceof String))
+ {
+ return
+ Boolean.FALSE;
+ // throw new RuntimeException("LIKE can only operate on String identifiers. LIKE attemped on: '" + rv.getClass());
+ }
+
+ return likePattern.matcher((String) rv).matches() ? Boolean.TRUE : Boolean.FALSE;
+ }
+
+ public boolean matches(Filterable message)
+ {
+ Object object = evaluate(message);
+
+ return (object != null) && (object == Boolean.TRUE);
+ }
+ }
+
+ public static BooleanExpression createLike(Expression left, String right, String escape)
+ {
+ if ((escape != null) && (escape.length() != 1))
+ {
+ throw new RuntimeException(
+ "The ESCAPE string litteral is invalid. It can only be one character. Litteral used: " + escape);
+ }
+
+ int c = -1;
+ if (escape != null)
+ {
+ c = 0xFFFF & escape.charAt(0);
+ }
+
+ return new LikeExpression(left, right, c);
+ }
+
+ public static BooleanExpression createNotLike(Expression left, String right, String escape)
+ {
+ return UnaryExpression.createNOT(createLike(left, right, escape));
+ }
+
+ public static BooleanExpression createInFilter(Expression left, List elements)
+ {
+
+ if (!(left instanceof PropertyExpression))
+ {
+ throw new RuntimeException("Expected a property for In expression, got: " + left);
+ }
+
+ return UnaryExpression.createInExpression((PropertyExpression) left, elements, false);
+
+ }
+
+ public static BooleanExpression createNotInFilter(Expression left, List elements)
+ {
+
+ if (!(left instanceof PropertyExpression))
+ {
+ throw new RuntimeException("Expected a property for In expression, got: " + left);
+ }
+
+ return UnaryExpression.createInExpression((PropertyExpression) left, elements, true);
+
+ }
+
+ public static BooleanExpression createIsNull(Expression left)
+ {
+ return doCreateEqual(left, ConstantExpression.NULL);
+ }
+
+ public static BooleanExpression createIsNotNull(Expression left)
+ {
+ return UnaryExpression.createNOT(doCreateEqual(left, ConstantExpression.NULL));
+ }
+
+ public static BooleanExpression createNotEqual(Expression left, Expression right)
+ {
+ return UnaryExpression.createNOT(createEqual(left, right));
+ }
+
+ public static BooleanExpression createEqual(Expression left, Expression right)
+ {
+ checkEqualOperand(left);
+ checkEqualOperand(right);
+ checkEqualOperandCompatability(left, right);
+
+ return doCreateEqual(left, right);
+ }
+
+ private static BooleanExpression doCreateEqual(Expression left, Expression right)
+ {
+ return new EqualExpression(left, right);
+ }
+
+ public static BooleanExpression createGreaterThan(final Expression left, final Expression right)
+ {
+ checkLessThanOperand(left);
+ checkLessThanOperand(right);
+
+ return new ComparisonExpression(left, right)
+ {
+ protected boolean asBoolean(int answer)
+ {
+ return answer > 0;
+ }
+
+ public String getExpressionSymbol()
+ {
+ return ">";
+ }
+ };
+ }
+
+ public static BooleanExpression createGreaterThanEqual(final Expression left, final Expression right)
+ {
+ checkLessThanOperand(left);
+ checkLessThanOperand(right);
+
+ return new ComparisonExpression(left, right)
+ {
+ protected boolean asBoolean(int answer)
+ {
+ return answer >= 0;
+ }
+
+ public String getExpressionSymbol()
+ {
+ return ">=";
+ }
+ };
+ }
+
+ public static BooleanExpression createLessThan(final Expression left, final Expression right)
+ {
+ checkLessThanOperand(left);
+ checkLessThanOperand(right);
+
+ return new ComparisonExpression(left, right)
+ {
+
+ protected boolean asBoolean(int answer)
+ {
+ return answer < 0;
+ }
+
+ public String getExpressionSymbol()
+ {
+ return "<";
+ }
+
+ };
+ }
+
+ public static BooleanExpression createLessThanEqual(final Expression left, final Expression right)
+ {
+ checkLessThanOperand(left);
+ checkLessThanOperand(right);
+
+ return new ComparisonExpression(left, right)
+ {
+
+ protected boolean asBoolean(int answer)
+ {
+ return answer <= 0;
+ }
+
+ public String getExpressionSymbol()
+ {
+ return "<=";
+ }
+ };
+ }
+
+ /**
+ * Only Numeric expressions can be used in >, >=, < or <= expressions.s
+ *
+ * @param expr
+ */
+ public static void checkLessThanOperand(Expression expr)
+ {
+ if (expr instanceof ConstantExpression)
+ {
+ Object value = ((ConstantExpression) expr).getValue();
+ if (value instanceof Number)
+ {
+ return;
+ }
+
+ // Else it's boolean or a String..
+ throw new RuntimeException("Value '" + expr + "' cannot be compared.");
+ }
+
+ if (expr instanceof BooleanExpression)
+ {
+ throw new RuntimeException("Value '" + expr + "' cannot be compared.");
+ }
+ }
+
+ /**
+ * Validates that the expression can be used in == or <> expression.
+ * Cannot not be NULL TRUE or FALSE litterals.
+ *
+ * @param expr
+ */
+ public static void checkEqualOperand(Expression expr)
+ {
+ if (expr instanceof ConstantExpression)
+ {
+ Object value = ((ConstantExpression) expr).getValue();
+ if (value == null)
+ {
+ throw new RuntimeException("'" + expr + "' cannot be compared.");
+ }
+ }
+ }
+
+ /**
+ *
+ * @param left
+ * @param right
+ */
+ private static void checkEqualOperandCompatability(Expression left, Expression right)
+ {
+ if ((left instanceof ConstantExpression) && (right instanceof ConstantExpression))
+ {
+ if ((left instanceof BooleanExpression) && !(right instanceof BooleanExpression))
+ {
+ throw new RuntimeException("'" + left + "' cannot be compared with '" + right + "'");
+ }
+ }
+ }
+
+ /**
+ * @param left
+ * @param right
+ */
+ public ComparisonExpression(Expression left, Expression right)
+ {
+ super(left, right);
+ }
+
+ public Object evaluate(Filterable message)
+ {
+ Comparable lv = (Comparable) left.evaluate(message);
+ if (lv == null)
+ {
+ return null;
+ }
+
+ Comparable rv = (Comparable) right.evaluate(message);
+ if (rv == null)
+ {
+ return null;
+ }
+
+ return compare(lv, rv);
+ }
+
+ protected Boolean compare(Comparable lv, Comparable rv)
+ {
+ Class lc = lv.getClass();
+ Class rc = rv.getClass();
+ // If the the objects are not of the same type,
+ // try to convert up to allow the comparison.
+ if (lc != rc)
+ {
+ if (lc == Byte.class)
+ {
+ if (rc == Short.class)
+ {
+ lv = ((Number) lv).shortValue();
+ }
+ else if (rc == Integer.class)
+ {
+ lv = ((Number) lv).intValue();
+ }
+ else if (rc == Long.class)
+ {
+ lv = ((Number) lv).longValue();
+ }
+ else if (rc == Float.class)
+ {
+ lv = ((Number) lv).floatValue();
+ }
+ else if (rc == Double.class)
+ {
+ lv = ((Number) lv).doubleValue();
+ }
+ else
+ {
+ return Boolean.FALSE;
+ }
+ }
+ else if (lc == Short.class)
+ {
+ if (rc == Integer.class)
+ {
+ lv = ((Number) lv).intValue();
+ }
+ else if (rc == Long.class)
+ {
+ lv = ((Number) lv).longValue();
+ }
+ else if (rc == Float.class)
+ {
+ lv = ((Number) lv).floatValue();
+ }
+ else if (rc == Double.class)
+ {
+ lv = ((Number) lv).doubleValue();
+ }
+ else
+ {
+ return Boolean.FALSE;
+ }
+ }
+ else if (lc == Integer.class)
+ {
+ if (rc == Long.class)
+ {
+ lv = ((Number) lv).longValue();
+ }
+ else if (rc == Float.class)
+ {
+ lv = ((Number) lv).floatValue();
+ }
+ else if (rc == Double.class)
+ {
+ lv = ((Number) lv).doubleValue();
+ }
+ else
+ {
+ return Boolean.FALSE;
+ }
+ }
+ else if (lc == Long.class)
+ {
+ if (rc == Integer.class)
+ {
+ rv = ((Number) rv).longValue();
+ }
+ else if (rc == Float.class)
+ {
+ lv = ((Number) lv).floatValue();
+ }
+ else if (rc == Double.class)
+ {
+ lv = ((Number) lv).doubleValue();
+ }
+ else
+ {
+ return Boolean.FALSE;
+ }
+ }
+ else if (lc == Float.class)
+ {
+ if (rc == Integer.class)
+ {
+ rv = ((Number) rv).floatValue();
+ }
+ else if (rc == Long.class)
+ {
+ rv = ((Number) rv).floatValue();
+ }
+ else if (rc == Double.class)
+ {
+ lv = ((Number) lv).doubleValue();
+ }
+ else
+ {
+ return Boolean.FALSE;
+ }
+ }
+ else if (lc == Double.class)
+ {
+ if (rc == Integer.class)
+ {
+ rv = ((Number) rv).doubleValue();
+ }
+ else if (rc == Long.class)
+ {
+ rv = ((Number) rv).doubleValue();
+ }
+ else if (rc == Float.class)
+ {
+ rv = ((Number) rv).doubleValue();
+ }
+ else
+ {
+ return Boolean.FALSE;
+ }
+ }
+ else
+ {
+ return Boolean.FALSE;
+ }
+ }
+
+ return asBoolean(lv.compareTo(rv)) ? Boolean.TRUE : Boolean.FALSE;
+ }
+
+ protected abstract boolean asBoolean(int answer);
+
+ public boolean matches(Filterable message)
+ {
+ Object object = evaluate(message);
+
+ return (object != null) && (object == Boolean.TRUE);
+ }
+
+ private static class EqualExpression extends ComparisonExpression
+ {
+ public EqualExpression(final Expression left, final Expression right)
+ {
+ super(left, right);
+ }
+
+ public Object evaluate(Filterable message)
+ {
+ Object lv = left.evaluate(message);
+ Object rv = right.evaluate(message);
+
+ // Iff one of the values is null
+ if ((lv == null) ^ (rv == null))
+ {
+ return Boolean.FALSE;
+ }
+
+ if ((lv == rv) || lv.equals(rv))
+ {
+ return Boolean.TRUE;
+ }
+
+ if ((lv instanceof Comparable) && (rv instanceof Comparable))
+ {
+ return compare((Comparable) lv, (Comparable) rv);
+ }
+
+ return Boolean.FALSE;
+ }
+
+ protected boolean asBoolean(int answer)
+ {
+ return answer == 0;
+ }
+
+ public String getExpressionSymbol()
+ {
+ return "=";
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java
new file mode 100644
index 0000000000..5cc9ca8ef2
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java
@@ -0,0 +1,209 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import java.math.BigDecimal;
+
+import org.apache.qpid.server.queue.Filterable;
+
+/**
+ * Represents a constant expression
+ */
+public class ConstantExpression implements Expression
+{
+
+ static class BooleanConstantExpression extends ConstantExpression implements BooleanExpression
+ {
+ public BooleanConstantExpression(Object value)
+ {
+ super(value);
+ }
+
+ public boolean matches(Filterable message)
+ {
+ Object object = evaluate(message);
+
+ return (object != null) && (object == Boolean.TRUE);
+ }
+ }
+
+ public static final BooleanConstantExpression NULL = new BooleanConstantExpression(null);
+ public static final BooleanConstantExpression TRUE = new BooleanConstantExpression(Boolean.TRUE);
+ public static final BooleanConstantExpression FALSE = new BooleanConstantExpression(Boolean.FALSE);
+
+ private Object value;
+
+ public static ConstantExpression createFromDecimal(String text)
+ {
+
+ // Strip off the 'l' or 'L' if needed.
+ if (text.endsWith("l") || text.endsWith("L"))
+ {
+ text = text.substring(0, text.length() - 1);
+ }
+
+ Number value;
+ try
+ {
+ value = new Long(text);
+ }
+ catch (NumberFormatException e)
+ {
+ // The number may be too big to fit in a long.
+ value = new BigDecimal(text);
+ }
+
+ long l = value.longValue();
+ if ((Integer.MIN_VALUE <= l) && (l <= Integer.MAX_VALUE))
+ {
+ value = value.intValue();
+ }
+
+ return new ConstantExpression(value);
+ }
+
+ public static ConstantExpression createFromHex(String text)
+ {
+ Number value = Long.parseLong(text.substring(2), 16);
+ long l = value.longValue();
+ if ((Integer.MIN_VALUE <= l) && (l <= Integer.MAX_VALUE))
+ {
+ value = value.intValue();
+ }
+
+ return new ConstantExpression(value);
+ }
+
+ public static ConstantExpression createFromOctal(String text)
+ {
+ Number value = Long.parseLong(text, 8);
+ long l = value.longValue();
+ if ((Integer.MIN_VALUE <= l) && (l <= Integer.MAX_VALUE))
+ {
+ value = value.intValue();
+ }
+
+ return new ConstantExpression(value);
+ }
+
+ public static ConstantExpression createFloat(String text)
+ {
+ Number value = new Double(text);
+
+ return new ConstantExpression(value);
+ }
+
+ public ConstantExpression(Object value)
+ {
+ this.value = value;
+ }
+
+ public Object evaluate(Filterable message)
+ {
+ return value;
+ }
+
+ public Object getValue()
+ {
+ return value;
+ }
+
+ /**
+ * @see java.lang.Object#toString()
+ */
+ public String toString()
+ {
+ if (value == null)
+ {
+ return "NULL";
+ }
+
+ if (value instanceof Boolean)
+ {
+ return ((Boolean) value) ? "TRUE" : "FALSE";
+ }
+
+ if (value instanceof String)
+ {
+ return encodeString((String) value);
+ }
+
+ return value.toString();
+ }
+
+ /**
+ * TODO: more efficient hashCode()
+ *
+ * @see java.lang.Object#hashCode()
+ */
+ public int hashCode()
+ {
+ return toString().hashCode();
+ }
+
+ /**
+ * TODO: more efficient hashCode()
+ *
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ public boolean equals(Object o)
+ {
+
+ if ((o == null) || !this.getClass().equals(o.getClass()))
+ {
+ return false;
+ }
+
+ return toString().equals(o.toString());
+
+ }
+
+ /**
+ * Encodes the value of string so that it looks like it would look like
+ * when it was provided in a selector.
+ *
+ * @param s
+ * @return
+ */
+ public static String encodeString(String s)
+ {
+ StringBuffer b = new StringBuffer();
+ b.append('\'');
+ for (int i = 0; i < s.length(); i++)
+ {
+ char c = s.charAt(i);
+ if (c == '\'')
+ {
+ b.append(c);
+ }
+
+ b.append(c);
+ }
+
+ b.append('\'');
+
+ return b.toString();
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java
new file mode 100644
index 0000000000..97e9915271
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java
@@ -0,0 +1,36 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import org.apache.qpid.server.queue.Filterable;
+
+/**
+ * Represents an expression
+ */
+public interface Expression
+{
+
+ /**
+ * @return the value of this expression
+ */
+ public Object evaluate(Filterable message);
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java
new file mode 100644
index 0000000000..b5e282038b
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import org.apache.qpid.server.queue.Filterable;
+
+public interface FilterManager
+{
+ void add(MessageFilter filter);
+
+ void remove(MessageFilter filter);
+
+ boolean allAllow(Filterable msg);
+
+ boolean hasFilters();
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
new file mode 100644
index 0000000000..dac517150a
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.filter;
+
+import java.util.Map;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.log4j.Logger;
+
+
+public class FilterManagerFactory
+{
+
+ private final static Logger _logger = Logger.getLogger(FilterManagerFactory.class);
+
+ //fixme move to a common class so it can be refered to from client code.
+
+ public static FilterManager createManager(FieldTable filters) throws AMQException
+ {
+ FilterManager manager = null;
+
+ if (filters != null)
+ {
+
+
+
+ if(filters.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()))
+ {
+ String selector = filters.getString(AMQPFilterTypes.JMS_SELECTOR.getValue());
+
+ if (selector != null && !selector.equals(""))
+ {
+ manager = new SimpleFilterManager();
+ manager.add(new JMSSelectorFilter(selector));
+ }
+
+ }
+
+
+ }
+ else
+ {
+ _logger.debug("No Filters found.");
+ }
+
+
+ return manager;
+
+ }
+
+ public static FilterManager createManager(Map<String,Object> map) throws AMQException
+ {
+ return createManager(FieldTable.convertToFieldTable(map));
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
new file mode 100644
index 0000000000..f32de03841
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.filter;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidArgumentException;
+import org.apache.qpid.server.filter.jms.selector.SelectorParser;
+import org.apache.qpid.server.queue.Filterable;
+
+
+public class JMSSelectorFilter implements MessageFilter
+{
+ private final static Logger _logger = org.apache.log4j.Logger.getLogger(JMSSelectorFilter.class);
+
+ private String _selector;
+ private BooleanExpression _matcher;
+
+ public JMSSelectorFilter(String selector) throws AMQInvalidArgumentException
+ {
+ _selector = selector;
+ _matcher = new SelectorParser().parse(selector);
+ }
+
+ public boolean matches(Filterable message)
+ {
+ boolean match = _matcher.matches(message);
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug(message + " match(" + match + ") selector(" + System.identityHashCode(_selector) + "):" + _selector);
+ }
+ return match;
+ }
+
+ public String getSelector()
+ {
+ return _selector;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "JMSSelector("+_selector+")";
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java
new file mode 100644
index 0000000000..fdba184da4
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java
@@ -0,0 +1,120 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import org.apache.qpid.server.queue.Filterable;
+
+/**
+ * A filter performing a comparison of two objects
+ */
+public abstract class LogicExpression extends BinaryExpression implements BooleanExpression
+{
+
+ public static BooleanExpression createOR(BooleanExpression lvalue, BooleanExpression rvalue)
+ {
+ return new OrExpression(lvalue, rvalue);
+ }
+
+ public static BooleanExpression createAND(BooleanExpression lvalue, BooleanExpression rvalue)
+ {
+ return new AndExpression(lvalue, rvalue);
+ }
+
+ /**
+ * @param left
+ * @param right
+ */
+ public LogicExpression(BooleanExpression left, BooleanExpression right)
+ {
+ super(left, right);
+ }
+
+ public abstract Object evaluate(Filterable message);
+
+ public boolean matches(Filterable message)
+ {
+ Object object = evaluate(message);
+
+ return (object != null) && (object == Boolean.TRUE);
+ }
+
+ private static class OrExpression extends LogicExpression
+ {
+ public OrExpression(final BooleanExpression lvalue, final BooleanExpression rvalue)
+ {
+ super(lvalue, rvalue);
+ }
+
+ public Object evaluate(Filterable message)
+ {
+
+ Boolean lv = (Boolean) left.evaluate(message);
+ // Can we do an OR shortcut??
+ if ((lv != null) && lv.booleanValue())
+ {
+ return Boolean.TRUE;
+ }
+
+ Boolean rv = (Boolean) right.evaluate(message);
+
+ return (rv == null) ? null : rv;
+ }
+
+ public String getExpressionSymbol()
+ {
+ return "OR";
+ }
+ }
+
+ private static class AndExpression extends LogicExpression
+ {
+ public AndExpression(final BooleanExpression lvalue, final BooleanExpression rvalue)
+ {
+ super(lvalue, rvalue);
+ }
+
+ public Object evaluate(Filterable message)
+ {
+
+ Boolean lv = (Boolean) left.evaluate(message);
+
+ // Can we do an AND shortcut??
+ if (lv == null)
+ {
+ return null;
+ }
+
+ if (!lv.booleanValue())
+ {
+ return Boolean.FALSE;
+ }
+
+ Boolean rv = (Boolean) right.evaluate(message);
+
+ return (rv == null) ? null : rv;
+ }
+
+ public String getExpressionSymbol()
+ {
+ return "AND";
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
new file mode 100644
index 0000000000..f5416af09a
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.filter;
+
+import org.apache.qpid.server.queue.Filterable;
+
+public interface MessageFilter
+{
+ boolean matches(Filterable message);
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java
new file mode 100644
index 0000000000..65ddf19fc4
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.filter;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.Filterable;
+
+public class NoConsumerFilter implements MessageFilter
+{
+ private final static Logger _logger = org.apache.log4j.Logger.getLogger(NoConsumerFilter.class);
+
+
+ public NoConsumerFilter() throws AMQException
+ {
+ _logger.info("Created NoConsumerFilter");
+ }
+
+ public boolean matches(Filterable message)
+ {
+ return true;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "NoConsumer";
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
new file mode 100644
index 0000000000..11fdeae2b1
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
@@ -0,0 +1,235 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import java.util.HashMap;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.CommonContentHeaderProperties;
+import org.apache.qpid.server.queue.Filterable;
+
+/**
+ * Represents a property expression
+ */
+public class PropertyExpression implements Expression
+{
+ // Constants - defined the same as JMS
+ private static final int NON_PERSISTENT = 1;
+ private static final int PERSISTENT = 2;
+ private static final int DEFAULT_PRIORITY = 4;
+
+ private static final Logger _logger = org.apache.log4j.Logger.getLogger(PropertyExpression.class);
+
+ private static final HashMap<String, Expression> JMS_PROPERTY_EXPRESSIONS = new HashMap<String, Expression>();
+
+ {
+ JMS_PROPERTY_EXPRESSIONS.put("JMSDestination", new Expression()
+ {
+ public Object evaluate(Filterable message)
+ {
+ //TODO
+ return null;
+ }
+ });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSReplyTo", new ReplyToExpression());
+
+ JMS_PROPERTY_EXPRESSIONS.put("JMSType", new TypeExpression());
+
+ JMS_PROPERTY_EXPRESSIONS.put("JMSDeliveryMode", new DeliveryModeExpression());
+
+ JMS_PROPERTY_EXPRESSIONS.put("JMSPriority", new PriorityExpression());
+
+ JMS_PROPERTY_EXPRESSIONS.put("JMSMessageID", new MessageIDExpression());
+
+ JMS_PROPERTY_EXPRESSIONS.put("AMQMessageID", new MessageIDExpression());
+
+ JMS_PROPERTY_EXPRESSIONS.put("JMSTimestamp", new TimestampExpression());
+
+ JMS_PROPERTY_EXPRESSIONS.put("JMSCorrelationID", new CorrelationIdExpression());
+
+ JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration", new ExpirationExpression());
+
+ JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new Expression()
+ {
+ public Object evaluate(Filterable message)
+ {
+ return message.isRedelivered();
+ }
+ });
+ }
+
+ private final String name;
+ private final Expression jmsPropertyExpression;
+
+ public boolean outerTest()
+ {
+ return false;
+ }
+
+ public PropertyExpression(String name)
+ {
+ this.name = name;
+
+
+
+ jmsPropertyExpression = (Expression) JMS_PROPERTY_EXPRESSIONS.get(name);
+ }
+
+ public Object evaluate(Filterable message)
+ {
+
+ if (jmsPropertyExpression != null)
+ {
+ return jmsPropertyExpression.evaluate(message);
+ }
+ else
+ {
+ return message.getMessageHeader().getHeader(name);
+ }
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ /**
+ * @see java.lang.Object#toString()
+ */
+ public String toString()
+ {
+ return name;
+ }
+
+ /**
+ * @see java.lang.Object#hashCode()
+ */
+ public int hashCode()
+ {
+ return name.hashCode();
+ }
+
+ /**
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ public boolean equals(Object o)
+ {
+
+ if ((o == null) || !this.getClass().equals(o.getClass()))
+ {
+ return false;
+ }
+
+ return name.equals(((PropertyExpression) o).name);
+
+ }
+
+ private static class ReplyToExpression implements Expression
+ {
+ public Object evaluate(Filterable message)
+ {
+ String replyTo = message.getMessageHeader().getReplyTo();
+ return replyTo;
+ }
+
+ }
+
+ private static class TypeExpression implements Expression
+ {
+ public Object evaluate(Filterable message)
+ {
+
+ String type = message.getMessageHeader().getType();
+ return type;
+
+ }
+ }
+
+ private static class DeliveryModeExpression implements Expression
+ {
+ public Object evaluate(Filterable message)
+ {
+ int mode = message.isPersistent() ? PERSISTENT : NON_PERSISTENT;
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("JMSDeliveryMode is :" + mode);
+ }
+
+ return mode;
+ }
+ }
+
+ private static class PriorityExpression implements Expression
+ {
+ public Object evaluate(Filterable message)
+ {
+ byte priority = message.getMessageHeader().getPriority();
+ return (int) priority;
+ }
+ }
+
+ private static class MessageIDExpression implements Expression
+ {
+ public Object evaluate(Filterable message)
+ {
+
+ String messageId = message.getMessageHeader().getMessageId();
+
+ return messageId;
+
+ }
+ }
+
+ private static class TimestampExpression implements Expression
+ {
+ public Object evaluate(Filterable message)
+ {
+ long timestamp = message.getMessageHeader().getTimestamp();
+ return timestamp;
+ }
+ }
+
+ private static class CorrelationIdExpression implements Expression
+ {
+ public Object evaluate(Filterable message)
+ {
+
+ String correlationId = message.getMessageHeader().getCorrelationId();
+
+ return correlationId;
+ }
+ }
+
+ private static class ExpirationExpression implements Expression
+ {
+ public Object evaluate(Filterable message)
+ {
+ long expiration = message.getMessageHeader().getExpiration();
+ return expiration;
+
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
new file mode 100644
index 0000000000..c563569cb4
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.filter;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.Filterable;
+
+public class SimpleFilterManager implements FilterManager
+{
+ private final Logger _logger = Logger.getLogger(SimpleFilterManager.class);
+
+ private final ConcurrentLinkedQueue<MessageFilter> _filters;
+ private String _toString = "";
+
+ public SimpleFilterManager()
+ {
+ _logger.debug("Creating SimpleFilterManager");
+ _filters = new ConcurrentLinkedQueue<MessageFilter>();
+ }
+
+ public void add(MessageFilter filter)
+ {
+ _filters.add(filter);
+ updateStringValue();
+ }
+
+ public void remove(MessageFilter filter)
+ {
+ _filters.remove(filter);
+ updateStringValue();
+ }
+
+ public boolean allAllow(Filterable msg)
+ {
+ for (MessageFilter filter : _filters)
+ {
+ if (!filter.matches(msg))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public boolean hasFilters()
+ {
+ return !_filters.isEmpty();
+ }
+
+
+ @Override
+ public String toString()
+ {
+ return _toString;
+ }
+
+ private void updateStringValue()
+ {
+ StringBuilder toString = new StringBuilder();
+ for (MessageFilter filter : _filters)
+ {
+ toString.append(filter.toString());
+ toString.append(",");
+ }
+
+ if (_filters.size() > 0)
+ {
+ //Remove the last ','
+ toString.deleteCharAt(toString.length()-1);
+ }
+ _toString = toString.toString();
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java
new file mode 100644
index 0000000000..557af95001
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java
@@ -0,0 +1,362 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import java.math.BigDecimal;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.Filterable;
+
+/**
+ * An expression which performs an operation on two expression values
+ */
+public abstract class UnaryExpression implements Expression
+{
+
+ private static final BigDecimal BD_LONG_MIN_VALUE = BigDecimal.valueOf(Long.MIN_VALUE);
+ protected Expression right;
+
+ public static Expression createNegate(Expression left)
+ {
+ return new NegativeExpression(left);
+ }
+
+ public static BooleanExpression createInExpression(PropertyExpression right, List elements, final boolean not)
+ {
+
+ // Use a HashSet if there are many elements.
+ Collection t;
+ if (elements.size() == 0)
+ {
+ t = null;
+ }
+ else if (elements.size() < 5)
+ {
+ t = elements;
+ }
+ else
+ {
+ t = new HashSet(elements);
+ }
+
+ final Collection inList = t;
+
+ return new InExpression(right, inList, not);
+ }
+
+ abstract static class BooleanUnaryExpression extends UnaryExpression implements BooleanExpression
+ {
+ public BooleanUnaryExpression(Expression left)
+ {
+ super(left);
+ }
+
+ public boolean matches(Filterable message)
+ {
+ Object object = evaluate(message);
+
+ return (object != null) && (object == Boolean.TRUE);
+ }
+ }
+ ;
+
+ public static<E extends Exception> BooleanExpression createNOT(BooleanExpression left)
+ {
+ return new NotExpression(left);
+ }
+
+ public static BooleanExpression createXPath(final String xpath)
+ {
+ return new XPathExpression(xpath);
+ }
+
+ public static BooleanExpression createXQuery(final String xpath)
+ {
+ return new XQueryExpression(xpath);
+ }
+
+ public static<E extends Exception> BooleanExpression createBooleanCast(Expression left)
+ {
+ return new BooleanCastExpression(left);
+ }
+
+ private static Number negate(Number left)
+ {
+ Class clazz = left.getClass();
+ if (clazz == Integer.class)
+ {
+ return -left.intValue();
+ }
+ else if (clazz == Long.class)
+ {
+ return -left.longValue();
+ }
+ else if (clazz == Float.class)
+ {
+ return -left.floatValue();
+ }
+ else if (clazz == Double.class)
+ {
+ return -left.doubleValue();
+ }
+ else if (clazz == BigDecimal.class)
+ {
+ // We ussually get a big deciamal when we have Long.MIN_VALUE constant in the
+ // Selector. Long.MIN_VALUE is too big to store in a Long as a positive so we store it
+ // as a Big decimal. But it gets Negated right away.. to here we try to covert it back
+ // to a Long.
+ BigDecimal bd = (BigDecimal) left;
+ bd = bd.negate();
+
+ if (BD_LONG_MIN_VALUE.compareTo(bd) == 0)
+ {
+ return Long.MIN_VALUE;
+ }
+
+ return bd;
+ }
+ else
+ {
+ throw new RuntimeException("Don't know how to negate: " + left);
+ }
+ }
+
+ public UnaryExpression(Expression left)
+ {
+ this.right = left;
+ }
+
+ public Expression getRight()
+ {
+ return right;
+ }
+
+ public void setRight(Expression expression)
+ {
+ right = expression;
+ }
+
+ /**
+ * @see java.lang.Object#toString()
+ */
+ public String toString()
+ {
+ return "(" + getExpressionSymbol() + " " + right.toString() + ")";
+ }
+
+ /**
+ * TODO: more efficient hashCode()
+ *
+ * @see java.lang.Object#hashCode()
+ */
+ public int hashCode()
+ {
+ return toString().hashCode();
+ }
+
+ /**
+ * TODO: more efficient hashCode()
+ *
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ public boolean equals(Object o)
+ {
+ return ((o != null) && this.getClass().equals(o.getClass())) && toString().equals(o.toString());
+ }
+
+ /**
+ * Returns the symbol that represents this binary expression. For example, addition is
+ * represented by "+"
+ *
+ * @return
+ */
+ public abstract String getExpressionSymbol();
+
+ private static class NegativeExpression extends UnaryExpression
+ {
+ public NegativeExpression(final Expression left)
+ {
+ super(left);
+ }
+
+ public Object evaluate(Filterable message)
+ {
+ Object rvalue = right.evaluate(message);
+ if (rvalue == null)
+ {
+ return null;
+ }
+
+ if (rvalue instanceof Number)
+ {
+ return negate((Number) rvalue);
+ }
+
+ return null;
+ }
+
+ public String getExpressionSymbol()
+ {
+ return "-";
+ }
+ }
+
+ private static class InExpression extends BooleanUnaryExpression
+ {
+ private final Collection _inList;
+ private final boolean _not;
+
+ public InExpression(final PropertyExpression right, final Collection inList, final boolean not)
+ {
+ super(right);
+ _inList = inList;
+ _not = not;
+ }
+
+ public Object evaluate(Filterable message)
+ {
+
+ Object rvalue = right.evaluate(message);
+ if (rvalue == null)
+ {
+ return null;
+ }
+
+ if (rvalue.getClass() != String.class)
+ {
+ return null;
+ }
+
+ if (((_inList != null) && _inList.contains(rvalue)) ^ _not)
+ {
+ return Boolean.TRUE;
+ }
+ else
+ {
+ return Boolean.FALSE;
+ }
+
+ }
+
+ public String toString()
+ {
+ StringBuffer answer = new StringBuffer();
+ answer.append(right);
+ answer.append(" ");
+ answer.append(getExpressionSymbol());
+ answer.append(" ( ");
+
+ int count = 0;
+ for (Iterator i = _inList.iterator(); i.hasNext();)
+ {
+ Object o = (Object) i.next();
+ if (count != 0)
+ {
+ answer.append(", ");
+ }
+
+ answer.append(o);
+ count++;
+ }
+
+ answer.append(" )");
+
+ return answer.toString();
+ }
+
+ public String getExpressionSymbol()
+ {
+ if (_not)
+ {
+ return "NOT IN";
+ }
+ else
+ {
+ return "IN";
+ }
+ }
+ }
+
+ private static class NotExpression extends BooleanUnaryExpression
+ {
+ public NotExpression(final BooleanExpression left)
+ {
+ super(left);
+ }
+
+ public Object evaluate(Filterable message)
+ {
+ Boolean lvalue = (Boolean) right.evaluate(message);
+ if (lvalue == null)
+ {
+ return null;
+ }
+
+ return lvalue.booleanValue() ? Boolean.FALSE : Boolean.TRUE;
+ }
+
+ public String getExpressionSymbol()
+ {
+ return "NOT";
+ }
+ }
+
+ private static class BooleanCastExpression extends BooleanUnaryExpression
+ {
+ public BooleanCastExpression(final Expression left)
+ {
+ super(left);
+ }
+
+ public Object evaluate(Filterable message)
+ {
+ Object rvalue = right.evaluate(message);
+ if (rvalue == null)
+ {
+ return null;
+ }
+
+ if (!rvalue.getClass().equals(Boolean.class))
+ {
+ return Boolean.FALSE;
+ }
+
+ return ((Boolean) rvalue).booleanValue() ? Boolean.TRUE : Boolean.FALSE;
+ }
+
+ public String toString()
+ {
+ return right.toString();
+ }
+
+ public String getExpressionSymbol()
+ {
+ return "";
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
new file mode 100644
index 0000000000..aa35cb5a76
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
@@ -0,0 +1,126 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.Filterable;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * Used to evaluate an XPath Expression in a JMS selector.
+ */
+public final class XPathExpression implements BooleanExpression {
+
+ private static final Logger log = Logger.getLogger(XPathExpression.class);
+ private static final String EVALUATOR_SYSTEM_PROPERTY = "org.apache.qpid.server.filter.XPathEvaluatorClassName";
+ private static final String DEFAULT_EVALUATOR_CLASS_NAME=XalanXPathEvaluator.class.getName();
+
+ private static final Constructor EVALUATOR_CONSTRUCTOR;
+
+ static {
+ String cn = System.getProperty(EVALUATOR_SYSTEM_PROPERTY, DEFAULT_EVALUATOR_CLASS_NAME);
+ Constructor m = null;
+ try {
+ try {
+ m = getXPathEvaluatorConstructor(cn);
+ } catch (Throwable e) {
+ log.warn("Invalid "+XPathEvaluator.class.getName()+" implementation: "+cn+", reason: "+e,e);
+ cn = DEFAULT_EVALUATOR_CLASS_NAME;
+ try {
+ m = getXPathEvaluatorConstructor(cn);
+ } catch (Throwable e2) {
+ log.error("Default XPath evaluator could not be loaded",e);
+ }
+ }
+ } finally {
+ EVALUATOR_CONSTRUCTOR = m;
+ }
+ }
+
+ private static Constructor getXPathEvaluatorConstructor(String cn) throws ClassNotFoundException, SecurityException, NoSuchMethodException {
+ Class c = XPathExpression.class.getClassLoader().loadClass(cn);
+ if( !XPathEvaluator.class.isAssignableFrom(c) ) {
+ throw new ClassCastException(""+c+" is not an instance of "+XPathEvaluator.class);
+ }
+ return c.getConstructor(new Class[]{String.class});
+ }
+
+ private final String xpath;
+ private final XPathEvaluator evaluator;
+
+ static public interface XPathEvaluator {
+ public boolean evaluate(Filterable message);
+ }
+
+ XPathExpression(String xpath) {
+ this.xpath = xpath;
+ this.evaluator = createEvaluator(xpath);
+ }
+
+ private XPathEvaluator createEvaluator(String xpath2) {
+ try {
+ return (XPathEvaluator)EVALUATOR_CONSTRUCTOR.newInstance(new Object[]{xpath});
+ } catch (InvocationTargetException e) {
+ Throwable cause = e.getCause();
+ if( cause instanceof RuntimeException ) {
+ throw (RuntimeException)cause;
+ }
+ throw new RuntimeException("Invalid XPath Expression: "+xpath+" reason: "+e.getMessage(), e);
+ } catch (Throwable e) {
+ throw new RuntimeException("Invalid XPath Expression: "+xpath+" reason: "+e.getMessage(), e);
+ }
+ }
+
+ public Object evaluate(Filterable message) {
+// try {
+//FIXME this is flow to disk work
+// if( message.isDropped() )
+// return null;
+ return evaluator.evaluate(message) ? Boolean.TRUE : Boolean.FALSE;
+// } catch (IOException e) {
+//
+// JMSException exception = new JMSException(e.getMessage());
+// exception.initCause(e);
+// throw exception;
+//
+// }
+
+ }
+
+ public String toString() {
+ return "XPATH "+ConstantExpression.encodeString(xpath);
+ }
+
+ /**
+ * @param message
+ * @return true if the expression evaluates to Boolean.TRUE.
+ * @throws AMQException
+ */
+ public boolean matches(Filterable message)
+ {
+ Object object = evaluate(message);
+ return object!=null && object==Boolean.TRUE;
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
new file mode 100644
index 0000000000..ae22f17413
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
@@ -0,0 +1,57 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.filter;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.Filterable;
+
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+/**
+ * Used to evaluate an XQuery Expression in a JMS selector.
+ */
+public final class XQueryExpression implements BooleanExpression {
+ private final String xpath;
+
+ XQueryExpression(String xpath) {
+ super();
+ this.xpath = xpath;
+ }
+
+ public Object evaluate(Filterable message) {
+ return Boolean.FALSE;
+ }
+
+ public String toString() {
+ return "XQUERY "+ConstantExpression.encodeString(xpath);
+ }
+
+ /**
+ * @param message
+ * @return true if the expression evaluates to Boolean.TRUE.
+ * @throws AMQException
+ */
+ public boolean matches(Filterable message)
+ {
+ Object object = evaluate(message);
+ return object!=null && object==Boolean.TRUE;
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
new file mode 100644
index 0000000000..f83eb63ac5
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
@@ -0,0 +1,101 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import java.io.ByteArrayInputStream;
+import java.io.StringReader;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+
+import org.apache.qpid.server.queue.Filterable;
+import org.apache.xpath.CachedXPathAPI;
+import org.w3c.dom.Document;
+import org.w3c.dom.traversal.NodeIterator;
+import org.xml.sax.InputSource;
+
+public class XalanXPathEvaluator implements XPathExpression.XPathEvaluator {
+
+ private final String xpath;
+
+ public XalanXPathEvaluator(String xpath) {
+ this.xpath = xpath;
+ }
+
+ public boolean evaluate(Filterable m)
+ {
+ // TODO - we would have to check the content type and then evaluate the content
+ // here... is this really a feature we wish to implement? - RobG
+ /*
+
+ if( m instanceof TextMessage ) {
+ String text = ((TextMessage)m).getText();
+ return evaluate(text);
+ } else if ( m instanceof BytesMessage ) {
+ BytesMessage bm = (BytesMessage) m;
+ byte data[] = new byte[(int) bm.getBodyLength()];
+ bm.readBytes(data);
+ return evaluate(data);
+ }
+ */
+ return false;
+
+ }
+
+ private boolean evaluate(byte[] data) {
+ try {
+
+ InputSource inputSource = new InputSource(new ByteArrayInputStream(data));
+
+ DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+ factory.setNamespaceAware(true);
+ DocumentBuilder dbuilder = factory.newDocumentBuilder();
+ Document doc = dbuilder.parse(inputSource);
+
+ CachedXPathAPI cachedXPathAPI = new CachedXPathAPI();
+ NodeIterator iterator = cachedXPathAPI.selectNodeIterator(doc,xpath);
+ return iterator.nextNode()!=null;
+
+ } catch (Throwable e) {
+ return false;
+ }
+ }
+
+ private boolean evaluate(String text) {
+ try {
+ InputSource inputSource = new InputSource(new StringReader(text));
+
+ DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+ factory.setNamespaceAware(true);
+ DocumentBuilder dbuilder = factory.newDocumentBuilder();
+ Document doc = dbuilder.parse(inputSource);
+
+ // We should associated the cachedXPathAPI object with the message being evaluated
+ // since that should speedup subsequent xpath expressions.
+ CachedXPathAPI cachedXPathAPI = new CachedXPathAPI();
+ NodeIterator iterator = cachedXPathAPI.selectNodeIterator(doc,xpath);
+ return iterator.nextNode()!=null;
+ } catch (Throwable e) {
+ return false;
+ }
+ }
+}