From 539571e411b4071a24e0568bc21d01eeae9706c6 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Wed, 6 Dec 2006 15:55:10 +0000 Subject: QPID-21 Broker Pom - Added geronimo jms jar - and javacc plugin for selectors. AMQMessage.java - Added decorator to extract JMS information. AMQQueue.java - Mainly whitespace - added filter argument to session registration Subscription.java - Added hasFilters and isInterested to enable the filtration process SubscriptionFactory.java - new constructor that takes the FieldTable of filters. {Test|Remote}Subscription{Impl}.java - Implementation of Subscription{Factory}.java features. SubscriptionSet.java - White space and added hasFilter/isInterested calls to determine nextSubscriber. AMQSession.java - Create FilterTable for basic.consume. AbstractJMSMessage.java - Updated println to provide further jms header details. URLHelper.java - whitespace URLSyntaxException.java - tidied up output amqp-8.0.xml - augmented consume to have argument field table Server/filter - Filtration classes from ActiveMQ and new FilterManager.java to drive them. message/jms - Decorator implementation MessageDecorator.java - Handles decoration of AMQMessages - currently hardwired but could be used to scan classpath. SelectorTest.java - Simple test to see that selectors work. Not exhaustive. FieldTableInTextMessageTest.java - Test that the Fieldtable (properties)in a TextMessage work as expected. (QPID-9) Reinvestigate wrt QPID-158 git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/jmsselectors@483127 13f79535-47bb-0310-9956-ffa450edef68 --- java/broker/pom.xml | 24 + java/broker/src/main/grammar/SelectorParser.jj | 598 +++++++++++++++++++++ .../java/org/apache/qpid/server/AMQChannel.java | 5 +- .../qpid/server/filter/ArithmeticExpression.java | 216 ++++++++ .../qpid/server/filter/BinaryExpression.java | 100 ++++ .../qpid/server/filter/BooleanExpression.java | 45 ++ .../qpid/server/filter/ComparisonExpression.java | 465 ++++++++++++++++ .../qpid/server/filter/ConstantExpression.java | 170 ++++++ .../org/apache/qpid/server/filter/Expression.java | 42 ++ .../apache/qpid/server/filter/FilterManager.java | 35 ++ .../qpid/server/filter/FilterManagerFactory.java | 67 +++ .../qpid/server/filter/JMSSelectorFilter.java | 82 +++ .../apache/qpid/server/filter/LogicExpression.java | 95 ++++ .../apache/qpid/server/filter/MessageFilter.java | 30 ++ .../qpid/server/filter/PropertyExpression.java | 305 +++++++++++ .../qpid/server/filter/SimpleFilterManager.java | 72 +++ .../apache/qpid/server/filter/UnaryExpression.java | 263 +++++++++ .../apache/qpid/server/filter/XPathExpression.java | 130 +++++ .../qpid/server/filter/XQueryExpression.java | 56 ++ .../qpid/server/filter/XalanXPathEvaluator.java | 99 ++++ .../server/handler/BasicConsumeMethodHandler.java | 2 +- .../qpid/server/message/MessageDecorator.java | 25 + .../apache/qpid/server/message/jms/JMSMessage.java | 288 ++++++++++ .../org/apache/qpid/server/queue/AMQMessage.java | 76 ++- .../org/apache/qpid/server/queue/AMQQueue.java | 29 +- .../org/apache/qpid/server/queue/Subscription.java | 5 + .../qpid/server/queue/SubscriptionFactory.java | 4 + .../apache/qpid/server/queue/SubscriptionImpl.java | 39 +- .../apache/qpid/server/queue/SubscriptionSet.java | 77 ++- .../java/org/apache/qpid/client/AMQSession.java | 52 +- .../qpid/client/message/AbstractJMSMessage.java | 11 +- .../unit/basic/FieldTableInTextMessageTest.java | 160 ++++++ .../apache/qpid/test/unit/basic/SelectorTest.java | 140 +++++ .../qpid/server/queue/RemoteSubscriptionImpl.java | 12 +- .../main/java/org/apache/qpid/url/URLHelper.java | 8 +- .../org/apache/qpid/url/URLSyntaxException.java | 4 +- .../apache/qpid/server/queue/TestSubscription.java | 10 + specs/amqp-8.0.xml | 23 +- 38 files changed, 3780 insertions(+), 84 deletions(-) create mode 100644 java/broker/src/main/grammar/SelectorParser.jj create mode 100644 java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/message/MessageDecorator.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java create mode 100644 java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableInTextMessageTest.java create mode 100644 java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java diff --git a/java/broker/pom.xml b/java/broker/pom.xml index 5a2c699cc1..f6dd171214 100644 --- a/java/broker/pom.xml +++ b/java/broker/pom.xml @@ -52,6 +52,10 @@ commons-lang commons-lang + + + org.apache.geronimo.specs + geronimo-jms_1.1_spec org.apache.mina @@ -81,6 +85,26 @@ + + + org.codehaus.mojo + javacc-maven-plugin + 2.0 + + + generate-sources + + ${basedir}/src/main/grammar + ${basedir}/target/generated + org.apache.qpid.server.filter.jms.selector + + + javacc + + + + + org.apache.maven.plugins maven-surefire-plugin diff --git a/java/broker/src/main/grammar/SelectorParser.jj b/java/broker/src/main/grammar/SelectorParser.jj new file mode 100644 index 0000000000..5553a46e47 --- /dev/null +++ b/java/broker/src/main/grammar/SelectorParser.jj @@ -0,0 +1,598 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + // + // Original File from r450141 of the Apache ActiveMQ project + // + +// ---------------------------------------------------------------------------- +// OPTIONS +// ---------------------------------------------------------------------------- +options { + STATIC = false; + UNICODE_INPUT = true; + + // some performance optimizations + OPTIMIZE_TOKEN_MANAGER = true; + ERROR_REPORTING = false; +} + +// ---------------------------------------------------------------------------- +// PARSER +// ---------------------------------------------------------------------------- + +PARSER_BEGIN(SelectorParser) +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.filter.jms.selector; + +import java.io.*; +import java.util.*; + +import javax.jms.InvalidSelectorException; + +import org.apache.qpid.server.filter.*; + +/** + * JMS Selector Parser generated by JavaCC + * + * Do not edit this .java file directly - it is autogenerated from SelectorParser.jj + */ +public class SelectorParser { + + public SelectorParser() { + this(new StringReader("")); + } + + public BooleanExpression parse(String sql) throws InvalidSelectorException { + this.ReInit(new StringReader(sql)); + + try { + return this.JmsSelector(); + } + catch (Throwable e) { + throw (InvalidSelectorException)new InvalidSelectorException(sql).initCause(e); + } + + } + + private BooleanExpression asBooleanExpression(Expression value) throws ParseException { + if (value instanceof BooleanExpression) { + return (BooleanExpression) value; + } + if (value instanceof PropertyExpression) { + return UnaryExpression.createBooleanCast( value ); + } + throw new ParseException("Expression will not result in a boolean value: " + value); + } + + +} + +PARSER_END(SelectorParser) + +// ---------------------------------------------------------------------------- +// Tokens +// ---------------------------------------------------------------------------- + +/* White Space */ +SPECIAL_TOKEN : +{ + " " | "\t" | "\n" | "\r" | "\f" +} + +/* Comments */ +SKIP: +{ + +} + +SKIP: +{ + +} + +/* Reserved Words */ +TOKEN [IGNORE_CASE] : +{ + < NOT : "NOT"> + | < AND : "AND"> + | < OR : "OR"> + | < BETWEEN : "BETWEEN"> + | < LIKE : "LIKE"> + | < ESCAPE : "ESCAPE"> + | < IN : "IN"> + | < IS : "IS"> + | < TRUE : "TRUE" > + | < FALSE : "FALSE" > + | < NULL : "NULL" > + | < XPATH : "XPATH" > + | < XQUERY : "XQUERY" > +} + +/* Literals */ +TOKEN [IGNORE_CASE] : +{ + + < DECIMAL_LITERAL: ["1"-"9"] (["0"-"9"])* (["l","L"])? > + | < HEX_LITERAL: "0" ["x","X"] (["0"-"9","a"-"f","A"-"F"])+ > + | < OCTAL_LITERAL: "0" (["0"-"7"])* > + | < FLOATING_POINT_LITERAL: + (["0"-"9"])+ "." (["0"-"9"])* ()? // matches: 5.5 or 5. or 5.5E10 or 5.E10 + | "." (["0"-"9"])+ ()? // matches: .5 or .5E10 + | (["0"-"9"])+ // matches: 5E10 + > + | < #EXPONENT: "E" (["+","-"])? (["0"-"9"])+ > + | < STRING_LITERAL: "'" ( ("''") | ~["'"] )* "'" > +} + +TOKEN [IGNORE_CASE] : +{ + < ID : ["a"-"z", "_", "$"] (["a"-"z","0"-"9","_", "$"])* > +} + +// ---------------------------------------------------------------------------- +// Grammer +// ---------------------------------------------------------------------------- +BooleanExpression JmsSelector() : +{ + Expression left=null; +} +{ + ( + left = orExpression() + ) + { + return asBooleanExpression(left); + } + +} + +Expression orExpression() : +{ + Expression left; + Expression right; +} +{ + ( + left = andExpression() + ( + right = andExpression() + { + left = LogicExpression.createOR(asBooleanExpression(left), asBooleanExpression(right)); + } + )* + ) + { + return left; + } + +} + + +Expression andExpression() : +{ + Expression left; + Expression right; +} +{ + ( + left = equalityExpression() + ( + right = equalityExpression() + { + left = LogicExpression.createAND(asBooleanExpression(left), asBooleanExpression(right)); + } + )* + ) + { + return left; + } +} + +Expression equalityExpression() : +{ + Expression left; + Expression right; +} +{ + ( + left = comparisonExpression() + ( + + "=" right = comparisonExpression() + { + left = ComparisonExpression.createEqual(left, right); + } + | + "<>" right = comparisonExpression() + { + left = ComparisonExpression.createNotEqual(left, right); + } + | + LOOKAHEAD(2) + + { + left = ComparisonExpression.createIsNull(left); + } + | + + { + left = ComparisonExpression.createIsNotNull(left); + } + )* + ) + { + return left; + } +} + +Expression comparisonExpression() : +{ + Expression left; + Expression right; + Expression low; + Expression high; + String t, u; + boolean not; + ArrayList list; +} +{ + ( + left = addExpression() + ( + + ">" right = addExpression() + { + left = ComparisonExpression.createGreaterThan(left, right); + } + | + ">=" right = addExpression() + { + left = ComparisonExpression.createGreaterThanEqual(left, right); + } + | + "<" right = addExpression() + { + left = ComparisonExpression.createLessThan(left, right); + } + | + "<=" right = addExpression() + { + left = ComparisonExpression.createLessThanEqual(left, right); + } + | + { + u=null; + } + t = stringLitteral() + [ u = stringLitteral() ] + { + left = ComparisonExpression.createLike(left, t, u); + } + | + LOOKAHEAD(2) + { + u=null; + } + t = stringLitteral() [ u = stringLitteral() ] + { + left = ComparisonExpression.createNotLike(left, t, u); + } + | + low = addExpression() high = addExpression() + { + left = ComparisonExpression.createBetween(left, low, high); + } + | + LOOKAHEAD(2) + low = addExpression() high = addExpression() + { + left = ComparisonExpression.createNotBetween(left, low, high); + } + | + + "(" + t = stringLitteral() + { + list = new ArrayList(); + list.add( t ); + } + ( + "," + t = stringLitteral() + { + list.add( t ); + } + + )* + ")" + { + left = ComparisonExpression.createInFilter(left, list); + } + | + LOOKAHEAD(2) + + "(" + t = stringLitteral() + { + list = new ArrayList(); + list.add( t ); + } + ( + "," + t = stringLitteral() + { + list.add( t ); + } + + )* + ")" + { + left = ComparisonExpression.createNotInFilter(left, list); + } + + )* + ) + { + return left; + } +} + +Expression addExpression() : +{ + Expression left; + Expression right; +} +{ + left = multExpr() + ( + LOOKAHEAD( ("+"|"-") multExpr()) + ( + "+" right = multExpr() + { + left = ArithmeticExpression.createPlus(left, right); + } + | + "-" right = multExpr() + { + left = ArithmeticExpression.createMinus(left, right); + } + ) + + )* + { + return left; + } +} + +Expression multExpr() : +{ + Expression left; + Expression right; +} +{ + left = unaryExpr() + ( + "*" right = unaryExpr() + { + left = ArithmeticExpression.createMultiply(left, right); + } + | + "/" right = unaryExpr() + { + left = ArithmeticExpression.createDivide(left, right); + } + | + "%" right = unaryExpr() + { + left = ArithmeticExpression.createMod(left, right); + } + + )* + { + return left; + } +} + + +Expression unaryExpr() : +{ + String s=null; + Expression left=null; +} +{ + ( + LOOKAHEAD( "+" unaryExpr() ) + "+" left=unaryExpr() + | + "-" left=unaryExpr() + { + left = UnaryExpression.createNegate(left); + } + | + left=unaryExpr() + { + left = UnaryExpression.createNOT( asBooleanExpression(left) ); + } + | + s=stringLitteral() + { + left = UnaryExpression.createXPath( s ); + } + | + s=stringLitteral() + { + left = UnaryExpression.createXQuery( s ); + } + | + left = primaryExpr() + ) + { + return left; + } + +} + +Expression primaryExpr() : +{ + Expression left=null; +} +{ + ( + left = literal() + | + left = variable() + | + "(" left = orExpression() ")" + ) + { + return left; + } +} + + + +ConstantExpression literal() : +{ + Token t; + String s; + ConstantExpression left=null; +} +{ + ( + ( + s = stringLitteral() + { + left = new ConstantExpression(s); + } + ) + | + ( + t = + { + left = ConstantExpression.createFromDecimal(t.image); + } + ) + | + ( + t = + { + left = ConstantExpression.createFromHex(t.image); + } + ) + | + ( + t = + { + left = ConstantExpression.createFromOctal(t.image); + } + ) + | + ( + t = + { + left = ConstantExpression.createFloat(t.image); + } + ) + | + ( + + { + left = ConstantExpression.TRUE; + } + ) + | + ( + + { + left = ConstantExpression.FALSE; + } + ) + | + ( + + { + left = ConstantExpression.NULL; + } + ) + ) + { + return left; + } +} + +String stringLitteral() : +{ + Token t; + StringBuffer rc = new StringBuffer(); + boolean first=true; +} +{ + t = + { + // Decode the sting value. + String image = t.image; + for( int i=1; i < image.length()-1; i++ ) { + char c = image.charAt(i); + if( c == '\'' ) + i++; + rc.append(c); + } + return rc.toString(); + } +} + +PropertyExpression variable() : +{ + Token t; + PropertyExpression left=null; +} +{ + ( + t = + { + left = new PropertyExpression(t.image); + } + ) + { + return left; + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index a6cb4523cf..270f75e86e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -26,6 +26,7 @@ import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.ack.TxAck; import org.apache.qpid.server.ack.UnacknowledgedMessage; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; @@ -290,7 +291,7 @@ public class AMQChannel * @throws ConsumerTagNotUniqueException if the tag is not unique * @throws AMQException if something goes wrong */ - public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks) throws AMQException, ConsumerTagNotUniqueException + public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks, FieldTable filters) throws AMQException, ConsumerTagNotUniqueException { if (tag == null) { @@ -301,7 +302,7 @@ public class AMQChannel throw new ConsumerTagNotUniqueException(); } - queue.registerProtocolSession(session, _channelId, tag, acks); + queue.registerProtocolSession(session, _channelId, tag, acks, filters); _consumerTag2QueueMap.put(tag, queue); return tag; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java new file mode 100644 index 0000000000..0aa5739c1c --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java @@ -0,0 +1,216 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.server.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project +// + + +import org.apache.qpid.server.queue.AMQMessage; + +import javax.jms.JMSException; + +/** + * An expression which performs an operation on two expression values + * + * @version $Revision$ + */ +public abstract class ArithmeticExpression extends BinaryExpression { + + protected static final int INTEGER = 1; + protected static final int LONG = 2; + protected static final int DOUBLE = 3; + + /** + * @param left + * @param right + */ + public ArithmeticExpression(Expression left, Expression right) { + super(left, right); + } + + public static Expression createPlus(Expression left, Expression right) { + return new ArithmeticExpression(left, right) { + protected Object evaluate(Object lvalue, Object rvalue) { + if (lvalue instanceof String) { + String text = (String) lvalue; + String answer = text + rvalue; + return answer; + } + else if (lvalue instanceof Number) { + return plus((Number) lvalue, asNumber(rvalue)); + } + throw new RuntimeException("Cannot call plus operation on: " + lvalue + " and: " + rvalue); + } + + public String getExpressionSymbol() { + return "+"; + } + }; + } + + public static Expression createMinus(Expression left, Expression right) { + return new ArithmeticExpression(left, right) { + protected Object evaluate(Object lvalue, Object rvalue) { + if (lvalue instanceof Number) { + return minus((Number) lvalue, asNumber(rvalue)); + } + throw new RuntimeException("Cannot call minus operation on: " + lvalue + " and: " + rvalue); + } + + public String getExpressionSymbol() { + return "-"; + } + }; + } + + public static Expression createMultiply(Expression left, Expression right) { + return new ArithmeticExpression(left, right) { + + protected Object evaluate(Object lvalue, Object rvalue) { + if (lvalue instanceof Number) { + return multiply((Number) lvalue, asNumber(rvalue)); + } + throw new RuntimeException("Cannot call multiply operation on: " + lvalue + " and: " + rvalue); + } + + public String getExpressionSymbol() { + return "*"; + } + }; + } + + public static Expression createDivide(Expression left, Expression right) { + return new ArithmeticExpression(left, right) { + + protected Object evaluate(Object lvalue, Object rvalue) { + if (lvalue instanceof Number) { + return divide((Number) lvalue, asNumber(rvalue)); + } + throw new RuntimeException("Cannot call divide operation on: " + lvalue + " and: " + rvalue); + } + + public String getExpressionSymbol() { + return "/"; + } + }; + } + + public static Expression createMod(Expression left, Expression right) { + return new ArithmeticExpression(left, right) { + + protected Object evaluate(Object lvalue, Object rvalue) { + if (lvalue instanceof Number) { + return mod((Number) lvalue, asNumber(rvalue)); + } + throw new RuntimeException("Cannot call mod operation on: " + lvalue + " and: " + rvalue); + } + + public String getExpressionSymbol() { + return "%"; + } + }; + } + + protected Number plus(Number left, Number right) { + switch (numberType(left, right)) { + case INTEGER: + return new Integer(left.intValue() + right.intValue()); + case LONG: + return new Long(left.longValue() + right.longValue()); + default: + return new Double(left.doubleValue() + right.doubleValue()); + } + } + + protected Number minus(Number left, Number right) { + switch (numberType(left, right)) { + case INTEGER: + return new Integer(left.intValue() - right.intValue()); + case LONG: + return new Long(left.longValue() - right.longValue()); + default: + return new Double(left.doubleValue() - right.doubleValue()); + } + } + + protected Number multiply(Number left, Number right) { + switch (numberType(left, right)) { + case INTEGER: + return new Integer(left.intValue() * right.intValue()); + case LONG: + return new Long(left.longValue() * right.longValue()); + default: + return new Double(left.doubleValue() * right.doubleValue()); + } + } + + protected Number divide(Number left, Number right) { + return new Double(left.doubleValue() / right.doubleValue()); + } + + protected Number mod(Number left, Number right) { + return new Double(left.doubleValue() % right.doubleValue()); + } + + private int numberType(Number left, Number right) { + if (isDouble(left) || isDouble(right)) { + return DOUBLE; + } + else if (left instanceof Long || right instanceof Long) { + return LONG; + } + else { + return INTEGER; + } + } + + private boolean isDouble(Number n) { + return n instanceof Float || n instanceof Double; + } + + protected Number asNumber(Object value) { + if (value instanceof Number) { + return (Number) value; + } + else { + throw new RuntimeException("Cannot convert value: " + value + " into a number"); + } + } + + public Object evaluate(AMQMessage message) throws JMSException { + Object lvalue = left.evaluate(message); + if (lvalue == null) { + return null; + } + Object rvalue = right.evaluate(message); + if (rvalue == null) { + return null; + } + return evaluate(lvalue, rvalue); + } + + + /** + * @param lvalue + * @param rvalue + * @return + */ + abstract protected Object evaluate(Object lvalue, Object rvalue); + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java new file mode 100644 index 0000000000..4256ab9189 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java @@ -0,0 +1,100 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.server.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project +// + + + +/** + * An expression which performs an operation on two expression values. + * + * @version $Revision$ + */ +abstract public class BinaryExpression implements Expression { + protected Expression left; + protected Expression right; + + public BinaryExpression(Expression left, Expression right) { + this.left = left; + this.right = right; + } + + public Expression getLeft() { + return left; + } + + public Expression getRight() { + return right; + } + + + /** + * @see java.lang.Object#toString() + */ + public String toString() { + return "(" + left.toString() + " " + getExpressionSymbol() + " " + right.toString() + ")"; + } + + /** + * TODO: more efficient hashCode() + * + * @see java.lang.Object#hashCode() + */ + public int hashCode() { + return toString().hashCode(); + } + + /** + * TODO: more efficient hashCode() + * + * @see java.lang.Object#equals(java.lang.Object) + */ + public boolean equals(Object o) { + + if (o == null || !this.getClass().equals(o.getClass())) { + return false; + } + return toString().equals(o.toString()); + + } + + /** + * Returns the symbol that represents this binary expression. For example, addition is + * represented by "+" + * + * @return + */ + abstract public String getExpressionSymbol(); + + /** + * @param expression + */ + public void setRight(Expression expression) { + right = expression; + } + + /** + * @param expression + */ + public void setLeft(Expression expression) { + left = expression; + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java new file mode 100644 index 0000000000..b66de3fbc5 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java @@ -0,0 +1,45 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.server.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project +// + + +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.message.jms.JMSMessage; + +import javax.jms.JMSException; + + +/** + * A BooleanExpression is an expression that always + * produces a Boolean result. + * + * @version $Revision$ + */ +public interface BooleanExpression extends Expression { + + /** + * @param message + * @return true if the expression evaluates to Boolean.TRUE. + * @throws JMSException + */ + public boolean matches(AMQMessage message) throws JMSException; + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java new file mode 100644 index 0000000000..13d278cf65 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java @@ -0,0 +1,465 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.server.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project +// + +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.message.jms.JMSMessage; + +import java.util.HashSet; +import java.util.List; +import java.util.regex.Pattern; + +import javax.jms.JMSException; + +/** + * A filter performing a comparison of two objects + * + * @version $Revision$ + */ +public abstract class ComparisonExpression extends BinaryExpression implements BooleanExpression { + + public static BooleanExpression createBetween(Expression value, Expression left, Expression right) { + return LogicExpression.createAND(createGreaterThanEqual(value, left), createLessThanEqual(value, right)); + } + + public static BooleanExpression createNotBetween(Expression value, Expression left, Expression right) { + return LogicExpression.createOR(createLessThan(value, left), createGreaterThan(value, right)); + } + + static final private HashSet REGEXP_CONTROL_CHARS = new HashSet(); + + static { + REGEXP_CONTROL_CHARS.add(new Character('.')); + REGEXP_CONTROL_CHARS.add(new Character('\\')); + REGEXP_CONTROL_CHARS.add(new Character('[')); + REGEXP_CONTROL_CHARS.add(new Character(']')); + REGEXP_CONTROL_CHARS.add(new Character('^')); + REGEXP_CONTROL_CHARS.add(new Character('$')); + REGEXP_CONTROL_CHARS.add(new Character('?')); + REGEXP_CONTROL_CHARS.add(new Character('*')); + REGEXP_CONTROL_CHARS.add(new Character('+')); + REGEXP_CONTROL_CHARS.add(new Character('{')); + REGEXP_CONTROL_CHARS.add(new Character('}')); + REGEXP_CONTROL_CHARS.add(new Character('|')); + REGEXP_CONTROL_CHARS.add(new Character('(')); + REGEXP_CONTROL_CHARS.add(new Character(')')); + REGEXP_CONTROL_CHARS.add(new Character(':')); + REGEXP_CONTROL_CHARS.add(new Character('&')); + REGEXP_CONTROL_CHARS.add(new Character('<')); + REGEXP_CONTROL_CHARS.add(new Character('>')); + REGEXP_CONTROL_CHARS.add(new Character('=')); + REGEXP_CONTROL_CHARS.add(new Character('!')); + } + + static class LikeExpression extends UnaryExpression implements BooleanExpression { + + Pattern likePattern; + + /** + * @param right + */ + public LikeExpression(Expression right, String like, int escape) { + super(right); + + StringBuffer regexp = new StringBuffer(like.length() * 2); + regexp.append("\\A"); // The beginning of the input + for (int i = 0; i < like.length(); i++) { + char c = like.charAt(i); + if (escape == (0xFFFF & c)) { + i++; + if (i >= like.length()) { + // nothing left to escape... + break; + } + + char t = like.charAt(i); + regexp.append("\\x"); + regexp.append(Integer.toHexString(0xFFFF & t)); + } + else if (c == '%') { + regexp.append(".*?"); // Do a non-greedy match + } + else if (c == '_') { + regexp.append("."); // match one + } + else if (REGEXP_CONTROL_CHARS.contains(new Character(c))) { + regexp.append("\\x"); + regexp.append(Integer.toHexString(0xFFFF & c)); + } + else { + regexp.append(c); + } + } + regexp.append("\\z"); // The end of the input + + likePattern = Pattern.compile(regexp.toString(), Pattern.DOTALL); + } + + /** + * org.apache.activemq.filter.UnaryExpression#getExpressionSymbol() + */ + public String getExpressionSymbol() { + return "LIKE"; + } + + /** + * org.apache.activemq.filter.Expression#evaluate(MessageEvaluationContext) + */ + public Object evaluate(AMQMessage message) throws JMSException { + + Object rv = this.getRight().evaluate(message); + + if (rv == null) { + return null; + } + + if (!(rv instanceof String)) { + return Boolean.FALSE; + //throw new RuntimeException("LIKE can only operate on String identifiers. LIKE attemped on: '" + rv.getClass()); + } + + return likePattern.matcher((String) rv).matches() ? Boolean.TRUE : Boolean.FALSE; + } + + public boolean matches(AMQMessage message) throws JMSException { + Object object = evaluate(message); + return object!=null && object==Boolean.TRUE; + } + } + + public static BooleanExpression createLike(Expression left, String right, String escape) { + if (escape != null && escape.length() != 1) { + throw new RuntimeException("The ESCAPE string litteral is invalid. It can only be one character. Litteral used: " + escape); + } + int c = -1; + if (escape != null) { + c = 0xFFFF & escape.charAt(0); + } + + return new LikeExpression(left, right, c); + } + + public static BooleanExpression createNotLike(Expression left, String right, String escape) { + return UnaryExpression.createNOT(createLike(left, right, escape)); + } + + public static BooleanExpression createInFilter(Expression left, List elements) { + + if( !(left instanceof PropertyExpression) ) + throw new RuntimeException("Expected a property for In expression, got: "+left); + return UnaryExpression.createInExpression((PropertyExpression)left, elements, false); + + } + + public static BooleanExpression createNotInFilter(Expression left, List elements) { + + if( !(left instanceof PropertyExpression) ) + throw new RuntimeException("Expected a property for In expression, got: "+left); + return UnaryExpression.createInExpression((PropertyExpression)left, elements, true); + + } + + public static BooleanExpression createIsNull(Expression left) { + return doCreateEqual(left, ConstantExpression.NULL); + } + + public static BooleanExpression createIsNotNull(Expression left) { + return UnaryExpression.createNOT(doCreateEqual(left, ConstantExpression.NULL)); + } + + public static BooleanExpression createNotEqual(Expression left, Expression right) { + return UnaryExpression.createNOT(createEqual(left, right)); + } + + public static BooleanExpression createEqual(Expression left, Expression right) { + checkEqualOperand(left); + checkEqualOperand(right); + checkEqualOperandCompatability(left, right); + return doCreateEqual(left, right); + } + + private static BooleanExpression doCreateEqual(Expression left, Expression right) { + return new ComparisonExpression(left, right) { + + public Object evaluate(AMQMessage message) throws JMSException { + Object lv = left.evaluate(message); + Object rv = right.evaluate(message); + + // Iff one of the values is null + if (lv == null ^ rv == null) { + return Boolean.FALSE; + } + if (lv == rv || lv.equals(rv)) { + return Boolean.TRUE; + } + if( lv instanceof Comparable && rv instanceof Comparable ) { + return compare((Comparable)lv, (Comparable)rv); + } + return Boolean.FALSE; + } + + protected boolean asBoolean(int answer) { + return answer == 0; + } + + public String getExpressionSymbol() { + return "="; + } + }; + } + + public static BooleanExpression createGreaterThan(final Expression left, final Expression right) { + checkLessThanOperand(left); + checkLessThanOperand(right); + return new ComparisonExpression(left, right) { + protected boolean asBoolean(int answer) { + return answer > 0; + } + + public String getExpressionSymbol() { + return ">"; + } + }; + } + + public static BooleanExpression createGreaterThanEqual(final Expression left, final Expression right) { + checkLessThanOperand(left); + checkLessThanOperand(right); + return new ComparisonExpression(left, right) { + protected boolean asBoolean(int answer) { + return answer >= 0; + } + + public String getExpressionSymbol() { + return ">="; + } + }; + } + + public static BooleanExpression createLessThan(final Expression left, final Expression right) { + checkLessThanOperand(left); + checkLessThanOperand(right); + return new ComparisonExpression(left, right) { + + protected boolean asBoolean(int answer) { + return answer < 0; + } + + public String getExpressionSymbol() { + return "<"; + } + + }; + } + + public static BooleanExpression createLessThanEqual(final Expression left, final Expression right) { + checkLessThanOperand(left); + checkLessThanOperand(right); + return new ComparisonExpression(left, right) { + + protected boolean asBoolean(int answer) { + return answer <= 0; + } + + public String getExpressionSymbol() { + return "<="; + } + }; + } + + /** + * Only Numeric expressions can be used in >, >=, < or <= expressions.s + * + * @param expr + */ + public static void checkLessThanOperand(Expression expr ) { + if( expr instanceof ConstantExpression ) { + Object value = ((ConstantExpression)expr).getValue(); + if( value instanceof Number ) + return; + + // Else it's boolean or a String.. + throw new RuntimeException("Value '"+expr+"' cannot be compared."); + } + if( expr instanceof BooleanExpression ) { + throw new RuntimeException("Value '"+expr+"' cannot be compared."); + } + } + + /** + * Validates that the expression can be used in == or <> expression. + * Cannot not be NULL TRUE or FALSE litterals. + * + * @param expr + */ + public static void checkEqualOperand(Expression expr ) { + if( expr instanceof ConstantExpression ) { + Object value = ((ConstantExpression)expr).getValue(); + if( value == null ) + throw new RuntimeException("'"+expr+"' cannot be compared."); + } + } + + /** + * + * @param left + * @param right + */ + private static void checkEqualOperandCompatability(Expression left, Expression right) { + if( left instanceof ConstantExpression && right instanceof ConstantExpression ) { + if( left instanceof BooleanExpression && !(right instanceof BooleanExpression) ) + throw new RuntimeException("'"+left+"' cannot be compared with '"+right+"'"); + } + } + + + + /** + * @param left + * @param right + */ + public ComparisonExpression(Expression left, Expression right) { + super(left, right); + } + + public Object evaluate(AMQMessage message) throws JMSException { + Comparable lv = (Comparable) left.evaluate(message); + if (lv == null) { + return null; + } + Comparable rv = (Comparable) right.evaluate(message); + if (rv == null) { + return null; + } + return compare(lv, rv); + } + + protected Boolean compare(Comparable lv, Comparable rv) { + Class lc = lv.getClass(); + Class rc = rv.getClass(); + // If the the objects are not of the same type, + // try to convert up to allow the comparison. + if (lc != rc) { + if (lc == Byte.class) { + if (rc == Short.class) { + lv = new Short(((Number) lv).shortValue()); + } + else if (rc == Integer.class) { + lv = new Integer(((Number) lv).intValue()); + } + else if (rc == Long.class) { + lv = new Long(((Number) lv).longValue()); + } + else if (rc == Float.class) { + lv = new Float(((Number) lv).floatValue()); + } + else if (rc == Double.class) { + lv = new Double(((Number) lv).doubleValue()); + } + else { + return Boolean.FALSE; + } + } else if (lc == Short.class) { + if (rc == Integer.class) { + lv = new Integer(((Number) lv).intValue()); + } + else if (rc == Long.class) { + lv = new Long(((Number) lv).longValue()); + } + else if (rc == Float.class) { + lv = new Float(((Number) lv).floatValue()); + } + else if (rc == Double.class) { + lv = new Double(((Number) lv).doubleValue()); + } + else { + return Boolean.FALSE; + } + } else if (lc == Integer.class) { + if (rc == Long.class) { + lv = new Long(((Number) lv).longValue()); + } + else if (rc == Float.class) { + lv = new Float(((Number) lv).floatValue()); + } + else if (rc == Double.class) { + lv = new Double(((Number) lv).doubleValue()); + } + else { + return Boolean.FALSE; + } + } + else if (lc == Long.class) { + if (rc == Integer.class) { + rv = new Long(((Number) rv).longValue()); + } + else if (rc == Float.class) { + lv = new Float(((Number) lv).floatValue()); + } + else if (rc == Double.class) { + lv = new Double(((Number) lv).doubleValue()); + } + else { + return Boolean.FALSE; + } + } + else if (lc == Float.class) { + if (rc == Integer.class) { + rv = new Float(((Number) rv).floatValue()); + } + else if (rc == Long.class) { + rv = new Float(((Number) rv).floatValue()); + } + else if (rc == Double.class) { + lv = new Double(((Number) lv).doubleValue()); + } + else { + return Boolean.FALSE; + } + } + else if (lc == Double.class) { + if (rc == Integer.class) { + rv = new Double(((Number) rv).doubleValue()); + } + else if (rc == Long.class) { + rv = new Double(((Number) rv).doubleValue()); + } + else if (rc == Float.class) { + rv = new Float(((Number) rv).doubleValue()); + } + else { + return Boolean.FALSE; + } + } + else + return Boolean.FALSE; + } + return asBoolean(lv.compareTo(rv)) ? Boolean.TRUE : Boolean.FALSE; + } + + protected abstract boolean asBoolean(int answer); + + public boolean matches(AMQMessage message) throws JMSException { + Object object = evaluate(message); + return object!=null && object==Boolean.TRUE; + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java new file mode 100644 index 0000000000..9bde712da2 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java @@ -0,0 +1,170 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.server.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project +// + +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.message.jms.JMSMessage; + +import java.math.BigDecimal; + +import javax.jms.JMSException; + +/** + * Represents a constant expression + * + * @version $Revision$ + */ +public class ConstantExpression implements Expression { + + static class BooleanConstantExpression extends ConstantExpression implements BooleanExpression { + public BooleanConstantExpression(Object value) { + super(value); + } + public boolean matches(AMQMessage message) throws JMSException { + Object object = evaluate(message); + return object!=null && object==Boolean.TRUE; + } + } + + public static final BooleanConstantExpression NULL = new BooleanConstantExpression(null); + public static final BooleanConstantExpression TRUE = new BooleanConstantExpression(Boolean.TRUE); + public static final BooleanConstantExpression FALSE = new BooleanConstantExpression(Boolean.FALSE); + + private Object value; + + public static ConstantExpression createFromDecimal(String text) { + + // Strip off the 'l' or 'L' if needed. + if( text.endsWith("l") || text.endsWith("L") ) + text = text.substring(0, text.length()-1); + + Number value; + try { + value = new Long(text); + } catch ( NumberFormatException e) { + // The number may be too big to fit in a long. + value = new BigDecimal(text); + } + + long l = value.longValue(); + if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) { + value = new Integer(value.intValue()); + } + return new ConstantExpression(value); + } + + public static ConstantExpression createFromHex(String text) { + Number value = new Long(Long.parseLong(text.substring(2), 16)); + long l = value.longValue(); + if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) { + value = new Integer(value.intValue()); + } + return new ConstantExpression(value); + } + + public static ConstantExpression createFromOctal(String text) { + Number value = new Long(Long.parseLong(text, 8)); + long l = value.longValue(); + if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) { + value = new Integer(value.intValue()); + } + return new ConstantExpression(value); + } + + public static ConstantExpression createFloat(String text) { + Number value = new Double(text); + return new ConstantExpression(value); + } + + public ConstantExpression(Object value) { + this.value = value; + } + + public Object evaluate(AMQMessage message) throws JMSException { + return value; + } + + public Object getValue() { + return value; + } + + /** + * @see java.lang.Object#toString() + */ + public String toString() { + if (value == null) { + return "NULL"; + } + if (value instanceof Boolean) { + return ((Boolean) value).booleanValue() ? "TRUE" : "FALSE"; + } + if (value instanceof String) { + return encodeString((String) value); + } + return value.toString(); + } + + /** + * TODO: more efficient hashCode() + * + * @see java.lang.Object#hashCode() + */ + public int hashCode() { + return toString().hashCode(); + } + + /** + * TODO: more efficient hashCode() + * + * @see java.lang.Object#equals(java.lang.Object) + */ + public boolean equals(Object o) { + + if (o == null || !this.getClass().equals(o.getClass())) { + return false; + } + return toString().equals(o.toString()); + + } + + + /** + * Encodes the value of string so that it looks like it would look like + * when it was provided in a selector. + * + * @param s + * @return + */ + public static String encodeString(String s) { + StringBuffer b = new StringBuffer(); + b.append('\''); + for (int i = 0; i < s.length(); i++) { + char c = s.charAt(i); + if (c == '\'') { + b.append(c); + } + b.append(c); + } + b.append('\''); + return b.toString(); + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java new file mode 100644 index 0000000000..a15c15fb91 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java @@ -0,0 +1,42 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.qpid.server.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project +// + +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.message.jms.JMSMessage; + +import javax.jms.JMSException; + + +/** + * Represents an expression + * + * @version $Revision$ + */ +public interface Expression { + + /** + * @return the value of this expression + */ + public Object evaluate(AMQMessage message) throws JMSException; + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java new file mode 100644 index 0000000000..e700bdfe55 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project +// + +import org.apache.qpid.server.queue.AMQMessage; + +public interface FilterManager +{ + void add(MessageFilter filter); + + void remove(MessageFilter filter); + + boolean allAllow(AMQMessage msg); +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java new file mode 100644 index 0000000000..5c106de6f2 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.filter; + +import org.apache.qpid.framing.FieldTable; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; + +import java.util.Iterator; + + +public class FilterManagerFactory +{ + //private final static Logger _logger = LoggerFactory.getLogger(FilterManagerFactory.class); + private final static org.apache.log4j.Logger _logger = org.apache.log4j.Logger.getLogger(FilterManagerFactory.class); + + //fixme move to a common class so it can be refered to from client code. + private static String JMS_SELECTOR_FILTER = "x-filter-jms-selector"; + + public static FilterManager createManager(FieldTable filters) + { + FilterManager manager = null; + + if (filters != null) + { + + manager = new SimpleFilterManager(); + + Iterator it = filters.keySet().iterator(); + _logger.info("Processing filters:"); + while (it.hasNext()) + { + String key = (String) it.next(); + _logger.info("filter:" + key); + if (key.equals(JMS_SELECTOR_FILTER)) + { + manager.add(new JMSSelectorFilter((String) filters.get(key))); + } + + } + } + else + { + _logger.info("No Filters found."); + } + + return manager; + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java b/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java new file mode 100644 index 0000000000..e54fdb944d --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.filter; + +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.filter.jms.selector.SelectorParser; +import org.apache.qpid.server.message.jms.JMSMessage; +import org.apache.log4j.Logger; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; + +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; + +public class JMSSelectorFilter implements MessageFilter +{ + + private final static Logger _logger = org.apache.log4j.Logger.getLogger(JMSSelectorFilter.class); +// LoggerFactory.getLogger(JMSSelectorFilter.class); + + private String _selector; + private BooleanExpression _matcher; + + public JMSSelectorFilter(String selector) + { + _selector = selector; + _logger.info("Created JMSSelectorFilter with selector:" + _selector); + + // BooleanExpression activemqSelctor = new ActiveMQSelectorParser(selector); + + try + { + _matcher = new SelectorParser().parse(selector); + } + catch (InvalidSelectorException e) + { + // fixme + // Will have to throw this back to the client... in the future + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + + } + + public boolean matches(AMQMessage message) + { + try + { + boolean match = _matcher.matches(message); + _logger.info(message + " match(" + match + ") selector:" + _selector); + return match; + } + catch (JMSException e) + { + //fixme this needs to be sorted.. it shouldn't happen + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + return false; + } + + public String getSelector() + { + return _selector; + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java new file mode 100644 index 0000000000..714d8c23f5 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java @@ -0,0 +1,95 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.server.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project +// + +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.message.jms.JMSMessage; + +import javax.jms.JMSException; + +/** + * A filter performing a comparison of two objects + * + * @version $Revision$ + */ +public abstract class LogicExpression extends BinaryExpression implements BooleanExpression { + + public static BooleanExpression createOR(BooleanExpression lvalue, BooleanExpression rvalue) { + return new LogicExpression(lvalue, rvalue) { + + public Object evaluate(AMQMessage message) throws JMSException { + + Boolean lv = (Boolean) left.evaluate(message); + // Can we do an OR shortcut?? + if (lv !=null && lv.booleanValue()) { + return Boolean.TRUE; + } + + Boolean rv = (Boolean) right.evaluate(message); + return rv==null ? null : rv; + } + + public String getExpressionSymbol() { + return "OR"; + } + }; + } + + public static BooleanExpression createAND(BooleanExpression lvalue, BooleanExpression rvalue) { + return new LogicExpression(lvalue, rvalue) { + + public Object evaluate(AMQMessage message) throws JMSException { + + Boolean lv = (Boolean) left.evaluate(message); + + // Can we do an AND shortcut?? + if (lv == null) + return null; + if (!lv.booleanValue()) { + return Boolean.FALSE; + } + + Boolean rv = (Boolean) right.evaluate(message); + return rv == null ? null : rv; + } + + public String getExpressionSymbol() { + return "AND"; + } + }; + } + + /** + * @param left + * @param right + */ + public LogicExpression(BooleanExpression left, BooleanExpression right) { + super(left, right); + } + + abstract public Object evaluate(AMQMessage message) throws JMSException; + + public boolean matches(AMQMessage message) throws JMSException { + Object object = evaluate(message); + return object!=null && object==Boolean.TRUE; + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java b/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java new file mode 100644 index 0000000000..b8ca75d209 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.filter; + +import org.apache.qpid.server.queue.AMQMessage; + +import javax.jms.JMSException; + +public interface MessageFilter +{ + boolean matches(AMQMessage message) throws JMSException; +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java new file mode 100644 index 0000000000..f3e9965c2e --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java @@ -0,0 +1,305 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.qpid.server.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project +// + +import java.io.IOException; +import java.util.HashMap; + +import javax.jms.DeliveryMode; +import javax.jms.JMSException; + +//import org.apache.activemq.command.ActiveMQDestination; +//import org.apache.activemq.command.Message; +//import org.apache.activemq.command.TransactionId; +//import org.apache.activemq.util.JMSExceptionSupport; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.message.jms.JMSMessage; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.log4j.Logger; + +/** + * Represents a property expression + * + * @version $Revision$ + */ +public class PropertyExpression implements Expression +{ + + interface SubExpression + { + public Object evaluate(AMQMessage message); + } + + interface JMSExpression + { + public abstract Object evaluate(JMSMessage message); + } + + static class SubJMSExpression implements SubExpression + { + JMSExpression _expression; + + SubJMSExpression(JMSExpression expression) + { + _expression = expression; + } + + + public Object evaluate(AMQMessage message) + { + JMSMessage msg = (JMSMessage) message.getDecodedMessage(AMQMessage.JMS_MESSAGE); + if (msg != null) + { + return _expression.evaluate(msg); + } + else + { + return null; + } + } + } + + private final static Logger _logger = org.apache.log4j.Logger.getLogger(PropertyExpression.class); + + + static final private HashMap JMS_PROPERTY_EXPRESSIONS = new HashMap(); + + static + { + JMS_PROPERTY_EXPRESSIONS.put("JMSDestination", new SubJMSExpression( + new JMSExpression() + { + public Object evaluate(JMSMessage message) + { + return message.getJMSDestination(); + } + } + )); +// +// public Object evaluate(AMQMessage message) +// { +// //fixme +// +// +//// AMQDestination dest = message.getOriginalDestination(); +//// if (dest == null) +//// { +//// dest = message.getDestination(); +//// } +//// if (dest == null) +//// { +//// return null; +//// } +//// return dest.toString(); +// return ""; +// } +// }); + JMS_PROPERTY_EXPRESSIONS.put("JMSReplyTo", new SubJMSExpression( + new JMSExpression() + { + public Object evaluate(JMSMessage message) + { + return message.getJMSReplyTo(); + } + }) + ); + + JMS_PROPERTY_EXPRESSIONS.put("JMSType", new SubJMSExpression( + new JMSExpression() + { + public Object evaluate(JMSMessage message) + { + return message.getJMSType(); + } + } + )); + + JMS_PROPERTY_EXPRESSIONS.put("JMSDeliveryMode", new SubJMSExpression( + new JMSExpression() + { + public Object evaluate(JMSMessage message) + { + try + { + Integer mode = new Integer(message.getAMQMessage().isPersistent() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + _logger.info("JMSDeliveryMode is :" + mode); + return mode; + } + catch (AMQException e) + { + //shouldn't happen + } + + return DeliveryMode.NON_PERSISTENT; + } + })); + + JMS_PROPERTY_EXPRESSIONS.put("JMSPriority", new SubJMSExpression( + new JMSExpression() + { + public Object evaluate(JMSMessage message) + { + return message.getJMSPriority(); + } + } + )); + + + JMS_PROPERTY_EXPRESSIONS.put("AMQMessageID", new SubJMSExpression( + new JMSExpression() + { + public Object evaluate(JMSMessage message) + { + return message.getAMQMessage().getMessageId(); + } + } + )); + + JMS_PROPERTY_EXPRESSIONS.put("JMSTimestamp", new SubJMSExpression( + new JMSExpression() + { + public Object evaluate(JMSMessage message) + { + return message.getJMSTimestamp(); + } + } + )); + + JMS_PROPERTY_EXPRESSIONS.put("JMSCorrelationID", new SubJMSExpression( + new JMSExpression() + { + public Object evaluate(JMSMessage message) + { + return message.getJMSCorrelationID(); + } + } + )); + + JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration", new SubJMSExpression( + new JMSExpression() + { + public Object evaluate(JMSMessage message) + { + return message.getJMSExpiration(); + } + } + )); + + JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new SubJMSExpression( + new JMSExpression() + { + public Object evaluate(JMSMessage message) + { + return message.getAMQMessage().isRedelivered(); + } + } + )); + + } + + private final String name; + private final SubExpression jmsPropertyExpression; + + public PropertyExpression(String name) + { + this.name = name; + jmsPropertyExpression = (SubExpression) JMS_PROPERTY_EXPRESSIONS.get(name); + } + + public Object evaluate(AMQMessage message) throws JMSException + { +// try +// { +// if (message.isDropped()) +// { +// return null; +// } + + if (jmsPropertyExpression != null) + { + return jmsPropertyExpression.evaluate(message); + } +// try + else + { + + BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties; + + _logger.info("Looking up property:" + name); + _logger.info("Properties are:" + _properties.getHeaders().keySet()); + + return _properties.getHeaders().get(name); + } +// catch (IOException ioe) +// { +// JMSException exception = new JMSException("Could not get property: " + name + " reason: " + ioe.getMessage()); +// exception.initCause(ioe); +// throw exception; +// } +// } +// catch (IOException e) +// { +// JMSException exception = new JMSException(e.getMessage()); +// exception.initCause(e); +// throw exception; +// } + + } + + public String getName() + { + return name; + } + + + /** + * @see java.lang.Object#toString() + */ + public String toString() + { + return name; + } + + /** + * @see java.lang.Object#hashCode() + */ + public int hashCode() + { + return name.hashCode(); + } + + /** + * @see java.lang.Object#equals(java.lang.Object) + */ + public boolean equals(Object o) + { + + if (o == null || !this.getClass().equals(o.getClass())) + { + return false; + } + return name.equals(((PropertyExpression) o).name); + + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java b/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java new file mode 100644 index 0000000000..1eba0b9fed --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.filter; + +import org.apache.qpid.server.queue.AMQMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.JMSException; +import java.util.concurrent.ConcurrentLinkedQueue; + +public class SimpleFilterManager implements FilterManager +{ + private final Logger _logger = LoggerFactory.getLogger(SimpleFilterManager.class); + + private final ConcurrentLinkedQueue _filters; + + public SimpleFilterManager() + { + _logger.debug("Creating SimpleFilterManager"); + _filters = new ConcurrentLinkedQueue(); + } + + public void add(MessageFilter filter) + { + _filters.add(filter); + } + + public void remove(MessageFilter filter) + { + _filters.remove(filter); + } + + public boolean allAllow(AMQMessage msg) + { + for (MessageFilter filter : _filters) + { + try + { + if (!filter.matches(msg)) + { + return false; + } + } + catch (JMSException e) + { + //fixme + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + return false; + } + } + return true; + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java new file mode 100644 index 0000000000..49ff147411 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java @@ -0,0 +1,263 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.server.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project +// + +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.message.jms.JMSMessage; + +import java.math.BigDecimal; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; + +import javax.jms.JMSException; + +/** + * An expression which performs an operation on two expression values + * + * @version $Revision$ + */ +public abstract class UnaryExpression implements Expression { + + private static final BigDecimal BD_LONG_MIN_VALUE = BigDecimal.valueOf(Long.MIN_VALUE); + protected Expression right; + + public static Expression createNegate(Expression left) { + return new UnaryExpression(left) { + public Object evaluate(AMQMessage message) throws JMSException { + Object rvalue = right.evaluate(message); + if (rvalue == null) { + return null; + } + if (rvalue instanceof Number) { + return negate((Number) rvalue); + } + return null; + } + + public String getExpressionSymbol() { + return "-"; + } + }; + } + + public static BooleanExpression createInExpression(PropertyExpression right, List elements, final boolean not) { + + // Use a HashSet if there are many elements. + Collection t; + if( elements.size()==0 ) + t=null; + else if( elements.size() < 5 ) + t = elements; + else { + t = new HashSet(elements); + } + final Collection inList = t; + + return new BooleanUnaryExpression(right) { + public Object evaluate(AMQMessage message) throws JMSException { + + Object rvalue = right.evaluate(message); + if (rvalue == null) { + return null; + } + if( rvalue.getClass()!=String.class ) + return null; + + if( (inList!=null && inList.contains(rvalue)) ^ not ) { + return Boolean.TRUE; + } else { + return Boolean.FALSE; + } + + } + + public String toString() { + StringBuffer answer = new StringBuffer(); + answer.append(right); + answer.append(" "); + answer.append(getExpressionSymbol()); + answer.append(" ( "); + + int count=0; + for (Iterator i = inList.iterator(); i.hasNext();) { + Object o = (Object) i.next(); + if( count!=0 ) { + answer.append(", "); + } + answer.append(o); + count++; + } + + answer.append(" )"); + return answer.toString(); + } + + public String getExpressionSymbol() { + if( not ) + return "NOT IN"; + else + return "IN"; + } + }; + } + + abstract static class BooleanUnaryExpression extends UnaryExpression implements BooleanExpression { + public BooleanUnaryExpression(Expression left) { + super(left); + } + + public boolean matches(AMQMessage message) throws JMSException { + Object object = evaluate(message); + return object!=null && object==Boolean.TRUE; + } + }; + + + public static BooleanExpression createNOT(BooleanExpression left) { + return new BooleanUnaryExpression(left) { + public Object evaluate(AMQMessage message) throws JMSException { + Boolean lvalue = (Boolean) right.evaluate(message); + if (lvalue == null) { + return null; + } + return lvalue.booleanValue() ? Boolean.FALSE : Boolean.TRUE; + } + + public String getExpressionSymbol() { + return "NOT"; + } + }; + } + + public static BooleanExpression createXPath(final String xpath) { + return new XPathExpression(xpath); + } + + public static BooleanExpression createXQuery(final String xpath) { + return new XQueryExpression(xpath); + } + + public static BooleanExpression createBooleanCast(Expression left) { + return new BooleanUnaryExpression(left) { + public Object evaluate(AMQMessage message) throws JMSException { + Object rvalue = right.evaluate(message); + if (rvalue == null) + return null; + if (!rvalue.getClass().equals(Boolean.class)) + return Boolean.FALSE; + return ((Boolean)rvalue).booleanValue() ? Boolean.TRUE : Boolean.FALSE; + } + + public String toString() { + return right.toString(); + } + + public String getExpressionSymbol() { + return ""; + } + }; + } + + private static Number negate(Number left) { + Class clazz = left.getClass(); + if (clazz == Integer.class) { + return new Integer(-left.intValue()); + } + else if (clazz == Long.class) { + return new Long(-left.longValue()); + } + else if (clazz == Float.class) { + return new Float(-left.floatValue()); + } + else if (clazz == Double.class) { + return new Double(-left.doubleValue()); + } + else if (clazz == BigDecimal.class) { + // We ussually get a big deciamal when we have Long.MIN_VALUE constant in the + // Selector. Long.MIN_VALUE is too big to store in a Long as a positive so we store it + // as a Big decimal. But it gets Negated right away.. to here we try to covert it back + // to a Long. + BigDecimal bd = (BigDecimal)left; + bd = bd.negate(); + + if( BD_LONG_MIN_VALUE.compareTo(bd)==0 ) { + return new Long(Long.MIN_VALUE); + } + return bd; + } + else { + throw new RuntimeException("Don't know how to negate: "+left); + } + } + + public UnaryExpression(Expression left) { + this.right = left; + } + + public Expression getRight() { + return right; + } + + public void setRight(Expression expression) { + right = expression; + } + + /** + * @see java.lang.Object#toString() + */ + public String toString() { + return "(" + getExpressionSymbol() + " " + right.toString() + ")"; + } + + /** + * TODO: more efficient hashCode() + * + * @see java.lang.Object#hashCode() + */ + public int hashCode() { + return toString().hashCode(); + } + + /** + * TODO: more efficient hashCode() + * + * @see java.lang.Object#equals(java.lang.Object) + */ + public boolean equals(Object o) { + + if (o == null || !this.getClass().equals(o.getClass())) { + return false; + } + return toString().equals(o.toString()); + + } + + /** + * Returns the symbol that represents this binary expression. For example, addition is + * represented by "+" + * + * @return + */ + abstract public String getExpressionSymbol(); + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java new file mode 100644 index 0000000000..ab952b6fea --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java @@ -0,0 +1,130 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.server.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project +// + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + +import javax.jms.JMSException; + +//import org.apache.activemq.command.Message; +//import org.apache.activemq.util.JMSExceptionSupport; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.qpid.server.queue.AMQMessage; + +/** + * Used to evaluate an XPath Expression in a JMS selector. + */ +public final class XPathExpression implements BooleanExpression { + + private static final Log log = LogFactory.getLog(XPathExpression.class); + private static final String EVALUATOR_SYSTEM_PROPERTY = "org.apache.qpid.server.filter.XPathEvaluatorClassName"; + private static final String DEFAULT_EVALUATOR_CLASS_NAME=XalanXPathEvaluator.class.getName(); + + private static final Constructor EVALUATOR_CONSTRUCTOR; + + static { + String cn = System.getProperty(EVALUATOR_SYSTEM_PROPERTY, DEFAULT_EVALUATOR_CLASS_NAME); + Constructor m = null; + try { + try { + m = getXPathEvaluatorConstructor(cn); + } catch (Throwable e) { + log.warn("Invalid "+XPathEvaluator.class.getName()+" implementation: "+cn+", reason: "+e,e); + cn = DEFAULT_EVALUATOR_CLASS_NAME; + try { + m = getXPathEvaluatorConstructor(cn); + } catch (Throwable e2) { + log.error("Default XPath evaluator could not be loaded",e); + } + } + } finally { + EVALUATOR_CONSTRUCTOR = m; + } + } + + private static Constructor getXPathEvaluatorConstructor(String cn) throws ClassNotFoundException, SecurityException, NoSuchMethodException { + Class c = XPathExpression.class.getClassLoader().loadClass(cn); + if( !XPathEvaluator.class.isAssignableFrom(c) ) { + throw new ClassCastException(""+c+" is not an instance of "+XPathEvaluator.class); + } + return c.getConstructor(new Class[]{String.class}); + } + + private final String xpath; + private final XPathEvaluator evaluator; + + static public interface XPathEvaluator { + public boolean evaluate(AMQMessage message) throws JMSException; + } + + XPathExpression(String xpath) { + this.xpath = xpath; + this.evaluator = createEvaluator(xpath); + } + + private XPathEvaluator createEvaluator(String xpath2) { + try { + return (XPathEvaluator)EVALUATOR_CONSTRUCTOR.newInstance(new Object[]{xpath}); + } catch (InvocationTargetException e) { + Throwable cause = e.getCause(); + if( cause instanceof RuntimeException ) { + throw (RuntimeException)cause; + } + throw new RuntimeException("Invalid XPath Expression: "+xpath+" reason: "+e.getMessage(), e); + } catch (Throwable e) { + throw new RuntimeException("Invalid XPath Expression: "+xpath+" reason: "+e.getMessage(), e); + } + } + + public Object evaluate(AMQMessage message) throws JMSException { +// try { +//FIXME this is flow to disk work +// if( message.isDropped() ) +// return null; + return evaluator.evaluate(message) ? Boolean.TRUE : Boolean.FALSE; +// } catch (IOException e) { +// +// JMSException exception = new JMSException(e.getMessage()); +// exception.initCause(e); +// throw exception; +// +// } + + } + + public String toString() { + return "XPATH "+ConstantExpression.encodeString(xpath); + } + + /** + * @param message + * @return true if the expression evaluates to Boolean.TRUE. + * @throws JMSException + */ + public boolean matches(AMQMessage message) throws JMSException { + Object object = evaluate(message); + return object!=null && object==Boolean.TRUE; + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java new file mode 100644 index 0000000000..53764cbf75 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java @@ -0,0 +1,56 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.server.filter; + +import org.apache.qpid.server.queue.AMQMessage; + +import javax.jms.JMSException; +// +// Based on like named file from r450141 of the Apache ActiveMQ project +// + +/** + * Used to evaluate an XQuery Expression in a JMS selector. + */ +public final class XQueryExpression implements BooleanExpression { + private final String xpath; + + XQueryExpression(String xpath) { + super(); + this.xpath = xpath; + } + + public Object evaluate(AMQMessage message) throws JMSException { + return Boolean.FALSE; + } + + public String toString() { + return "XQUERY "+ConstantExpression.encodeString(xpath); + } + + /** + * @param message + * @return true if the expression evaluates to Boolean.TRUE. + * @throws JMSException + */ + public boolean matches(AMQMessage message) throws JMSException { + Object object = evaluate(message); + return object!=null && object==Boolean.TRUE; + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java b/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java new file mode 100644 index 0000000000..4b78fd18df --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java @@ -0,0 +1,99 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.qpid.server.filter; +// +// Based on like named file from r450141 of the Apache ActiveMQ project +// + +import java.io.StringReader; +import java.io.ByteArrayInputStream; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.TextMessage; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; + +//import org.apache.activemq.command.Message; +//import org.apache.activemq.util.ByteArrayInputStream; +import org.apache.xpath.CachedXPathAPI; +import org.apache.qpid.server.queue.AMQMessage; +import org.w3c.dom.Document; +import org.w3c.dom.traversal.NodeIterator; +import org.xml.sax.InputSource; + +public class XalanXPathEvaluator implements XPathExpression.XPathEvaluator { + + private final String xpath; + + public XalanXPathEvaluator(String xpath) { + this.xpath = xpath; + } + + public boolean evaluate(AMQMessage m) throws JMSException { + if( m instanceof TextMessage ) { + String text = ((TextMessage)m).getText(); + return evaluate(text); + } else if ( m instanceof BytesMessage ) { + BytesMessage bm = (BytesMessage) m; + byte data[] = new byte[(int) bm.getBodyLength()]; + bm.readBytes(data); + return evaluate(data); + } + return false; + } + + private boolean evaluate(byte[] data) { + try { + + InputSource inputSource = new InputSource(new ByteArrayInputStream(data)); + + DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); + factory.setNamespaceAware(true); + DocumentBuilder dbuilder = factory.newDocumentBuilder(); + Document doc = dbuilder.parse(inputSource); + + CachedXPathAPI cachedXPathAPI = new CachedXPathAPI(); + NodeIterator iterator = cachedXPathAPI.selectNodeIterator(doc,xpath); + return iterator.nextNode()!=null; + + } catch (Throwable e) { + return false; + } + } + + private boolean evaluate(String text) { + try { + InputSource inputSource = new InputSource(new StringReader(text)); + + DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); + factory.setNamespaceAware(true); + DocumentBuilder dbuilder = factory.newDocumentBuilder(); + Document doc = dbuilder.parse(inputSource); + + // We should associated the cachedXPathAPI object with the message being evaluated + // since that should speedup subsequent xpath expressions. + CachedXPathAPI cachedXPathAPI = new CachedXPathAPI(); + NodeIterator iterator = cachedXPathAPI.selectNodeIterator(doc,xpath); + return iterator.nextNode()!=null; + } catch (Throwable e) { + return false; + } + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index d4c94061a0..c4c995540d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -74,7 +74,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener _tokens = new HashSet(); private AMQProtocolSession _publisher; - private final BasicPublishBody _publishBody; + private final BasicPublishBody _publishBody; private ContentHeaderBody _contentHeaderBody; @@ -83,6 +88,7 @@ public class AMQMessage * messages published with the 'immediate' flag. */ private boolean _deliveredToConsumer; + private ConcurrentHashMap _decodedMessages; public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody) @@ -96,17 +102,19 @@ public class AMQMessage _publishBody = publishBody; _store = messageStore; _contentBodies = new LinkedList(); + _decodedMessages = new ConcurrentHashMap(); _storeWhenComplete = storeWhenComplete; } public AMQMessage(MessageStore store, long messageId, BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, List contentBodies) throws AMQException - + { _publishBody = publishBody; _contentHeaderBody = contentHeaderBody; _contentBodies = contentBodies; + _decodedMessages = new ConcurrentHashMap(); _messageId = messageId; _store = store; storeMessage(); @@ -116,7 +124,7 @@ public class AMQMessage ContentHeaderBody contentHeaderBody, List contentBodies) throws AMQException { - this(store, store.getNewMessageId(), publishBody, contentHeaderBody, contentBodies); + this(store, store.getNewMessageId(), publishBody, contentHeaderBody, contentBodies); } protected AMQMessage(AMQMessage msg) throws AMQException @@ -270,7 +278,7 @@ public class AMQMessage { _store.removeMessage(_messageId); } - catch(AMQException e) + catch (AMQException e) { //to maintain consistency, we revert the count incrementReference(); @@ -291,7 +299,7 @@ public class AMQMessage public boolean checkToken(Object token) { - if(_tokens.contains(token)) + if (_tokens.contains(token)) { return true; } @@ -307,7 +315,7 @@ public class AMQMessage //if the message is not persistent or the queue is not durable //we will not need to recover the association and so do not //need to record it - if(isPersistent() && queue.isDurable()) + if (isPersistent() && queue.isDurable()) { _store.enqueueMessage(queue.getName(), _messageId); } @@ -317,7 +325,7 @@ public class AMQMessage { //only record associations where both queue and message will survive //a restart, so only need to remove association if this is the case - if(isPersistent() && queue.isDurable()) + if (isPersistent() && queue.isDurable()) { _store.dequeueMessage(queue.getName(), _messageId); } @@ -325,14 +333,14 @@ public class AMQMessage public boolean isPersistent() throws AMQException { - if(_contentHeaderBody == null) + if (_contentHeaderBody == null) { throw new AMQException("Cannot determine delivery mode of message. Content header not found."); } //todo remove literal values to a constant file such as AMQConstants in common return _contentHeaderBody.properties instanceof BasicContentHeaderProperties - &&((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2; + && ((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2; } public void setTxnBuffer(TxnBuffer buffer) @@ -346,13 +354,15 @@ public class AMQMessage } /** - * Called to enforce the 'immediate' flag. + * Called to enforce the 'immediate' flag. + * * @throws NoConsumersException if the message is marked for - * immediate delivery but has not been marked as delivered to a - * consumer + * immediate delivery but has not been marked as delivered to a + * consumer */ - public void checkDeliveredToConsumer() throws NoConsumersException{ - if(isImmediate() && !_deliveredToConsumer) + public void checkDeliveredToConsumer() throws NoConsumersException + { + if (isImmediate() && !_deliveredToConsumer) { throw new NoConsumersException(_publishBody, _contentHeaderBody, _contentBodies); } @@ -362,7 +372,43 @@ public class AMQMessage * Called when this message is delivered to a consumer. (used to * implement the 'immediate' flag functionality). */ - public void setDeliveredToConsumer(){ + public void setDeliveredToConsumer() + { _deliveredToConsumer = true; } + + + public MessageDecorator getDecodedMessage(String type) + { + MessageDecorator msgtype = null; + + if (_decodedMessages != null) + { + msgtype = _decodedMessages.get(type); + + if (msgtype == null) + { + msgtype = decorateMessage(type); + } + } + + return msgtype; + } + + private MessageDecorator decorateMessage(String type) + { + MessageDecorator msgdec = null; + + if (type.equals(JMS_MESSAGE)) + { + msgdec = new JMSMessage(this); + } + + if (msgdec != null) + { + _decodedMessages.put(type, msgdec); + } + + return msgdec; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 353a2007c0..f734edcc7b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -26,6 +26,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.MBeanConstructor; @@ -141,8 +142,8 @@ public class AMQQueue implements Managable, Comparable // OpenMBean data types for viewMessageContent method private CompositeType _msgContentType = null; - private String[] _msgContentAttributes = {"Message Id", "MimeType", "Encoding", "Content"}; - private OpenType[] _msgContentAttributeTypes = new OpenType[4]; + private String[] _msgContentAttributes = {"Message Id", "MimeType", "Encoding", "Content"}; + private OpenType[] _msgContentAttributeTypes = new OpenType[4]; @MBeanConstructor("Creates an MBean exposing an AMQQueue") public AMQQueueMBean() throws JMException @@ -162,14 +163,14 @@ public class AMQQueue implements Managable, Comparable _msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content _msgContentType = new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes, - _msgContentAttributes, _msgContentAttributeTypes); + _msgContentAttributes, _msgContentAttributeTypes); _msgAttributeTypes[0] = SimpleType.LONG; // For message id _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes _msgAttributeTypes[2] = SimpleType.LONG; // For size _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered - _messageDataType = new CompositeType("Message","AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes); + _messageDataType = new CompositeType("Message", "AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes); _messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType, _msgAttributeIndex); } @@ -265,7 +266,7 @@ public class AMQQueue implements Managable, Comparable { queueDepth = queueDepth + getMessageSize(message); } - return (long)Math.round(queueDepth / 1000); + return (long) Math.round(queueDepth / 1000); } /** @@ -314,7 +315,7 @@ public class AMQQueue implements Managable, Comparable private void notifyClients(String notificationMsg) { Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, - ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg); + ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg); _broadcaster.sendNotification(n); } @@ -361,7 +362,7 @@ public class AMQQueue implements Managable, Comparable if (msg == null) { - throw new JMException("AMQMessage with message id = " + msgId + " is not in the " + _queueName ); + throw new JMException("AMQMessage with message id = " + msgId + " is not in the " + _queueName); } // get message content List cBodies = msg.getContentBodies(); @@ -379,7 +380,7 @@ public class AMQQueue implements Managable, Comparable } // Create header attributes list - BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties)msg.getContentHeaderBody().properties; + BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) msg.getContentHeaderBody().properties; String mimeType = headerProperties.getContentType(); String encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding(); Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])}; @@ -402,12 +403,12 @@ public class AMQQueue implements Managable, Comparable TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType); // Create the tabular list of message header contents - for (int i = beginIndex; i <= endIndex && i <= list.size(); i++) + for (int i = beginIndex; i <= endIndex && i <= list.size(); i++) { AMQMessage msg = list.get(i - 1); - ContentHeaderBody headerBody = msg.getContentHeaderBody(); + ContentHeaderBody headerBody = msg.getContentHeaderBody(); // Create header attributes list - BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties)headerBody.properties; + BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties; List headerAttribsList = new ArrayList(); headerAttribsList.add("App Id=" + headerProperties.getAppId()); headerAttribsList.add("MimeType=" + headerProperties.getContentType()); @@ -430,7 +431,7 @@ public class AMQQueue implements Managable, Comparable @Override public MBeanNotificationInfo[] getNotificationInfo() { - String[] notificationTypes = new String[] {MonitorNotification.THRESHOLD_VALUE_EXCEEDED}; + String[] notificationTypes = new String[]{MonitorNotification.THRESHOLD_VALUE_EXCEEDED}; String name = MonitorNotification.class.getName(); String description = "Either Message count or Queue depth or Message size has reached threshold high value"; MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description); @@ -581,12 +582,12 @@ public class AMQQueue implements Managable, Comparable _bindings.addBinding(routingKey, exchange); } - public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks) + public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters) throws AMQException { debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this); - Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks); + Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters); _subscribers.addSubscriber(subscription); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java index dfc16a7c71..35ac9a13ac 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.server.filter.MessageFilter; public interface Subscription { @@ -29,4 +30,8 @@ public interface Subscription boolean isSuspended(); void queueDeleted(AMQQueue queue); + + boolean hasFilters(); + + boolean hasInterest(AMQMessage msg); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java index 0fd44e4fbc..f464384562 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.FieldTable; /** * Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This @@ -32,6 +33,9 @@ import org.apache.qpid.AMQException; */ public interface SubscriptionFactory { + Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters) + throws AMQException; + Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks) throws AMQException; diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 5cad28b80d..a0d86deb19 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -26,7 +26,10 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.BasicDeliverBody; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.filter.FilterManager; +import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.protocol.AMQProtocolSession; /** @@ -52,25 +55,38 @@ public class SubscriptionImpl implements Subscription * True if messages need to be acknowledged */ private final boolean _acks; + private FilterManager _filters; public static class Factory implements SubscriptionFactory { + public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters) throws AMQException + { + return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters); + } + public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks) throws AMQException { - return new SubscriptionImpl(channel, protocolSession, consumerTag, acks); + return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, null); } public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag) throws AMQException { - return new SubscriptionImpl(channel, protocolSession, consumerTag); + return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null); } } public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession, String consumerTag, boolean acks) throws AMQException + { + this(channelId, protocolSession, consumerTag, acks, null); + } + + public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession, + String consumerTag, boolean acks, FieldTable filters) + throws AMQException { AMQChannel channel = protocolSession.getChannel(channelId); if (channel == null) @@ -83,14 +99,9 @@ public class SubscriptionImpl implements Subscription this.consumerTag = consumerTag; sessionKey = protocolSession.getKey(); _acks = acks; + _filters = FilterManagerFactory.createManager(filters); } - public SubscriptionImpl(int channel, AMQProtocolSession protocolSession, - String consumerTag) - throws AMQException - { - this(channel, protocolSession, consumerTag, false); - } public boolean equals(Object o) { @@ -131,7 +142,7 @@ public class SubscriptionImpl implements Subscription { // if we do not need to wait for client acknowledgements // we can decrement the reference count immediately. - + // By doing this _before_ the send we ensure that it // doesn't get sent if it can't be dequeued, preventing // duplicate delivery on recovery. @@ -178,6 +189,16 @@ public class SubscriptionImpl implements Subscription channel.queueDeleted(queue); } + public boolean hasFilters() + { + return _filters != null; + } + + public boolean hasInterest(AMQMessage msg) + { + return _filters.allAllow(msg); + } + private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange) { AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(), consumerTag, diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java index 7cc3f5f719..a8f778244e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,6 +21,8 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; + import java.util.List; import java.util.ListIterator; import java.util.concurrent.CopyOnWriteArrayList; @@ -58,6 +60,7 @@ class SubscriptionSet implements WeightedSubscriptionManager /** * Remove the subscription, returning it if it was found + * * @param subscription * @return null if no match was found */ @@ -90,7 +93,7 @@ class SubscriptionSet implements WeightedSubscriptionManager /** * Return the next unsuspended subscription or null if not found. - * + *

* Performance note: * This method can scan all items twice when looking for a subscription that is not * suspended. The worst case occcurs when all subscriptions are suspended. However, it is does this @@ -105,29 +108,59 @@ class SubscriptionSet implements WeightedSubscriptionManager return null; } - try { - final Subscription result = nextSubscriber(); - if (result == null) { + try + { + final Subscription result = nextSubscriberImpl(msg); + if (result == null) + { _currentSubscriber = 0; - return nextSubscriber(); - } else { + return nextSubscriberImpl(msg); + } + else + { return result; } - } catch (IndexOutOfBoundsException e) { + } + catch (IndexOutOfBoundsException e) + { _currentSubscriber = 0; - return nextSubscriber(); + return nextSubscriber(msg); } } - private Subscription nextSubscriber() + private Subscription nextSubscriberImpl(AMQMessage msg) { final ListIterator iterator = _subscriptions.listIterator(_currentSubscriber); - while (iterator.hasNext()) { + while (iterator.hasNext()) + { Subscription subscription = iterator.next(); ++_currentSubscriber; subscriberScanned(); - if (!subscription.isSuspended()) { - return subscription; + + if (!subscription.isSuspended()) + { + + if ((!subscription.hasFilters()) || (subscription.hasFilters() && subscription.hasInterest(msg))) + { + return subscription; + } + // 2006-12-04 : It is fairer to simply skip the person who isn't interested. + // Although it does need to be looked at again. + +// else +// { +// //Don't take penalise a subscriber for not wanting this message. +// // This would introduce unfairness sticking with the current subscriber +// // will allow the next message to match.. although could lead to unfairness if: +// // subscribers: a(bin) b(text) c(text) +// // msgs : 1(text) 2(text) 3(bin) +// // subscriber c won't get any messages. as the first two text msgs will go to b and then a will get +// // the bin msg. +// // Never said this was fair round-robin-ing. +// //FIXME - Make a fair round robin. +// +// --_currentSubscriber; +// } } } return null; @@ -149,7 +182,10 @@ class SubscriptionSet implements WeightedSubscriptionManager { for (Subscription s : _subscriptions) { - if (!s.isSuspended()) return true; + if (!s.isSuspended()) + { + return true; + } } return false; } @@ -159,7 +195,10 @@ class SubscriptionSet implements WeightedSubscriptionManager int count = 0; for (Subscription s : _subscriptions) { - if (!s.isSuspended()) count++; + if (!s.isSuspended()) + { + count++; + } } return count; } @@ -167,9 +206,10 @@ class SubscriptionSet implements WeightedSubscriptionManager /** * Notification that a queue has been deleted. This is called so that the subscription can inform the * channel, which in turn can update its list of unacknowledged messages. + * * @param queue */ - public void queueDeleted(AMQQueue queue) + public void queueDeleted(AMQQueue queue) throws AMQException { for (Subscription s : _subscriptions) { @@ -177,7 +217,8 @@ class SubscriptionSet implements WeightedSubscriptionManager } } - int size() { + int size() + { return _subscriptions.size(); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index bfd294f09e..885f7647bc 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -829,14 +829,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi AMQDestination amqd = (AMQDestination) destination; final AMQProtocolHandler protocolHandler = _connection.getProtocolHandler(); - // TODO: construct the rawSelector from the selector string if rawSelector == null + final FieldTable ft = FieldTableFactory.newFieldTable(); - //if (rawSelector != null) - // ft.put("headers", rawSelector.getDataAsBytes()); + + // Add headers for headers exchange if (rawSelector != null) { ft.putAll(rawSelector); } + + BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, _connection, amqd, selector, noLocal, _messageFactoryRegistry, AMQSession.this, protocolHandler, ft, prefetchHigh, prefetchLow, exclusive, @@ -915,6 +917,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi protocolHandler.writeFrame(queueBind); } +// /** +// * Register to consume from the queue. +// * +// * @param queueName +// * @return the consumer tag generated by the broker +// */ +// private String consumeFromQueue(String queueName, AMQProtocolHandler protocolHandler, int prefetchHigh, int prefetchLow, +// boolean noLocal, boolean exclusive, int acknowledgeMode) throws AMQException +// { +// return consumeFromQueue(queueName, protocolHandler, prefetchHigh, prefetchLow, noLocal, exclusive, acknowledgeMode, null); +// } + /** * Register to consume from the queue. * @@ -922,16 +936,25 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @return the consumer tag generated by the broker */ private String consumeFromQueue(String queueName, AMQProtocolHandler protocolHandler, int prefetchHigh, int prefetchLow, - boolean noLocal, boolean exclusive, int acknowledgeMode) throws AMQException + boolean noLocal, boolean exclusive, int acknowledgeMode, String messageSelector) throws AMQException { //fixme prefetch values are not used here. Do we need to have them as parametsrs? //need to generate a consumer tag on the client so we can exploit the nowait flag String tag = Integer.toString(_nextTag++); + FieldTable ft = new FieldTable(); + + if (messageSelector != null) + { + //fixme move literal value to a common class. + ft.put("x-filter-jms-selector", messageSelector); + } + AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0, queueName, tag, noLocal, acknowledgeMode == Session.NO_ACKNOWLEDGE, - exclusive, true); + exclusive, true, ft); + protocolHandler.writeFrame(jmsConsume); return tag; @@ -1218,11 +1241,22 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable()); - String consumerTag = consumeFromQueue(queueName, protocolHandler, consumer.getPrefetchHigh(), consumer.getPrefetchLow(), - consumer.isNoLocal(), consumer.isExclusive(), consumer.getAcknowledgeMode()); + String consumerTag = null; + try + { + consumerTag = consumeFromQueue(queueName, protocolHandler, consumer.getPrefetchHigh(), consumer.getPrefetchLow(), + consumer.isNoLocal(), consumer.isExclusive(), consumer.getAcknowledgeMode(), + consumer.getMessageSelector()); + + consumer.setConsumerTag(consumerTag); + _consumers.put(consumerTag, consumer); + } + catch (JMSException e) + { + // getMessageSelector throws JMSEx but it is simply a string return so won't happen. + } + - consumer.setConsumerTag(consumerTag); - _consumers.put(consumerTag, consumer); } /** diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index fbb55ae289..aaf0320afb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -58,7 +58,8 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms { _data.acquire(); } - _readableProperties = false; + // ContentHeaderProperties are just created and so are empty + //_readableProperties = (_contentHeaderProperties != null); _readableMessage = (data != null); } @@ -424,7 +425,15 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms buf.append("\nJMS priority: ").append(getJMSPriority()); buf.append("\nJMS delivery mode: ").append(getJMSDeliveryMode()); buf.append("\nJMS reply to: ").append(String.valueOf(getJMSReplyTo())); + buf.append("\nJMS Type: ").append(String.valueOf(getJMSType())); + buf.append("\nJMS CorrelationID: ").append(String.valueOf(getJMSCorrelationID())); + buf.append("\nJMS Destination: NOT IMPLEMENTED");//.append(String.valueOf(getJMSDestination())); + buf.append("\nJMS MessageID: ").append(String.valueOf(getJMSMessageID())); + buf.append("\nJMS Redelivered: ").append(String.valueOf(getJMSRedelivered())); + buf.append("\nProperty Names: ").append(String.valueOf(getPropertyNames())); + buf.append("\nAMQ message number: ").append(_deliveryTag); + buf.append("\nProperties:"); if (getJmsContentHeaderProperties().getHeaders().isEmpty()) { diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableInTextMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableInTextMessageTest.java new file mode 100644 index 0000000000..bccf5b4ccd --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableInTextMessageTest.java @@ -0,0 +1,160 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.basic; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.message.JMSTextMessage; +import org.apache.qpid.test.VMBrokerSetup; +import org.apache.log4j.Logger; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import javax.jms.*; + +import junit.framework.TestCase; +import junit.framework.Assert; + +public class FieldTableInTextMessageTest extends TestCase implements MessageListener +{ + private final static Logger _logger = org.apache.log4j.Logger.getLogger(FieldTableInTextMessageTest.class); + + private AMQConnection _connection; + private Destination _destination; + private AMQSession _session; + private Message original_message = null; + private Message received_message = null; + public String _connectionString = "vm://:1"; + + protected void setUp() throws Exception + { + super.setUp(); + try + { + init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path")); + } + catch (Exception e) + { + fail("Unable to initialilse connection: " + e); + } + } + + protected void tearDown() throws Exception + { + super.tearDown(); + } + + private void init(AMQConnection connection) throws Exception + { + Destination destination = new AMQQueue(randomize("TextMessageTest"), true); + init(connection, destination); + } + + private void init(AMQConnection connection, Destination destination) throws Exception + { + _connection = connection; + _destination = destination; + _session = (AMQSession) connection.createSession(false, AMQSession.AUTO_ACKNOWLEDGE); + + //set up a slow consumer + _session.createConsumer(destination).setMessageListener(this); + connection.start(); + } + + public void test() throws Exception + { + send(); + waitFor(); + check(); + System.out.println("Completed without failure"); + _connection.close(); + } + + void send() throws JMSException + { + //create a publisher + MessageProducer producer = _session.createProducer(_destination); + Message message = _session.createTextMessage("Message Body"); + message.setBooleanProperty("boolan", true); + message.setByteProperty("byte", Byte.MAX_VALUE); + message.setDoubleProperty("double", Double.MAX_VALUE); + message.setFloatProperty("float", Float.MAX_VALUE); + message.setIntProperty("int", Integer.MAX_VALUE); + message.setLongProperty("long", Long.MAX_VALUE); + message.setShortProperty("short", Short.MAX_VALUE); + message.setStringProperty("String", "String"); + + + original_message = message; + _logger.info("Sending Message:" + message); + producer.send(message); + + } + + void waitFor() throws InterruptedException + { + synchronized(received_message) + { + received_message.wait(); + } + } + + void check() throws JMSException + { + _logger.info("Received Message:" + received_message); + assertEqual(original_message, received_message); + } + + private static void assertEqual(Message expected, Message actual) + { + _logger.info("Expected:" + expected); + _logger.info("Actual:" + actual); + } + + public void onMessage(Message message) + { + synchronized(received_message) + { + received_message = message; + received_message.notify(); + } + } + + private static String randomize(String in) + { + return in + System.currentTimeMillis(); + } + + public static void main(String[] argv) throws Exception + { + FieldTableInTextMessageTest test = new FieldTableInTextMessageTest(); + test._connectionString = argv.length == 0 ? "vm://:1" : argv[0]; + test.setUp(); + test.test(); + } + + public static junit.framework.Test suite() + { + return new VMBrokerSetup(new junit.framework.TestSuite(FieldTableInTextMessageTest.class)); + } +} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java new file mode 100644 index 0000000000..28b0a1c2f7 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java @@ -0,0 +1,140 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.basic; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.BasicMessageProducer; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.test.VMBrokerSetup; +import org.apache.log4j.Logger; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.DeliveryMode; + +import junit.framework.TestCase; + +public class SelectorTest extends TestCase implements MessageListener +{ + + private final static Logger _logger = org.apache.log4j.Logger.getLogger(SelectorTest.class); + + private AMQConnection _connection; + private AMQDestination _destination; + private AMQSession _session; + private int count; + public String _connectionString = "vm://:1"; + + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path")); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + } + + private void init(AMQConnection connection) throws Exception + { + init(connection, new AMQQueue(randomize("SessionStartTest"), true)); + } + + private void init(AMQConnection connection, AMQDestination destination) throws Exception + { + _connection = connection; + _destination = destination; + connection.start(); + + + String selector= null; +// selector = "Cost = 2 AND JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT; +// selector = "JMSType = Special AND Cost = 2 AND AMQMessageID > 0 AND JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT; + + _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); + //_session.createConsumer(destination).setMessageListener(this); + _session.createConsumer(destination, selector).setMessageListener(this); + } + + public synchronized void test() throws JMSException, InterruptedException + { + try + { + Message msg = _session.createTextMessage("Message"); + msg.setJMSPriority(1); + msg.setIntProperty("Cost", 2); + msg.setJMSType("Special"); + + _logger.info("Sending Message:" + msg); + + ((BasicMessageProducer) _session.createProducer(_destination)).send(msg, DeliveryMode.NON_PERSISTENT); + System.out.println("Message sent, waiting for response..."); + wait(1000); + + if (count > 0) + { + _logger.info("Got message"); + } + + if (count == 0) + { + fail("Did not get message!"); + //throw new RuntimeException("Did not get message!"); + } + } + finally + { + _session.close(); + _connection.close(); + } + } + + public synchronized void onMessage(Message message) + { + count++; + _logger.info("Got Message:" + message); + notify(); + } + + private static String randomize(String in) + { + return in + System.currentTimeMillis(); + } + + public static void main(String[] argv) throws Exception + { + SelectorTest test = new SelectorTest(); + test._connectionString = argv.length == 0 ? "localhost:5672" : argv[0]; + test.setUp(); + test.test(); + } + + public static junit.framework.Test suite() + { + return new VMBrokerSetup(new junit.framework.TestSuite(SelectorTest.class)); + } +} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java index 0268ff2171..1394e7e20b 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java @@ -88,9 +88,19 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage public void queueDeleted(AMQQueue queue) { - if(queue instanceof ClusteredQueue) + if (queue instanceof ClusteredQueue) { ((ClusteredQueue) queue).removeAllRemoteSubscriber(_peer); } } + + public boolean hasFilters() + { + return false; + } + + public boolean hasInterest(AMQMessage msg) + { + return true; + } } diff --git a/java/common/src/main/java/org/apache/qpid/url/URLHelper.java b/java/common/src/main/java/org/apache/qpid/url/URLHelper.java index 2121346c02..f8c621e413 100644 --- a/java/common/src/main/java/org/apache/qpid/url/URLHelper.java +++ b/java/common/src/main/java/org/apache/qpid/url/URLHelper.java @@ -64,9 +64,9 @@ public class URLHelper if (valueIndex + 1 < options.length()) { if (options.charAt(valueIndex + 1) == DEFAULT_OPTION_SEPERATOR || - options.charAt(valueIndex + 1) == ALTERNATIVE_OPTION_SEPARATOR || - options.charAt(valueIndex + 1) == BROKER_SEPARATOR || - options.charAt(valueIndex + 1) == '\'') + options.charAt(valueIndex + 1) == ALTERNATIVE_OPTION_SEPARATOR || + options.charAt(valueIndex + 1) == BROKER_SEPARATOR || + options.charAt(valueIndex + 1) == '\'') { nestedQuotes--; // System.out.println( @@ -119,7 +119,7 @@ public class URLHelper else { parseError(sepIndex, "Unterminated option. Possible illegal option separator:'" + - options.charAt(sepIndex) + "'", options); + options.charAt(sepIndex) + "'", options); } } diff --git a/java/common/src/main/java/org/apache/qpid/url/URLSyntaxException.java b/java/common/src/main/java/org/apache/qpid/url/URLSyntaxException.java index 3ff7195794..5cac2505a8 100644 --- a/java/common/src/main/java/org/apache/qpid/url/URLSyntaxException.java +++ b/java/common/src/main/java/org/apache/qpid/url/URLSyntaxException.java @@ -62,12 +62,12 @@ public class URLSyntaxException extends URISyntaxException if (getIndex() > -1) { - if (_length != -1) + if (_length > 1) { sb.append(" between indicies "); sb.append(getIndex()); sb.append(" and "); - sb.append(_length); + sb.append(getIndex() + _length); } else { diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java b/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java index de6df4bc03..2de1a0fe69 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java @@ -70,6 +70,16 @@ public class TestSubscription implements Subscription { } + public boolean hasFilters() + { + return false; + } + + public boolean hasInterest(AMQMessage msg) + { + return true; + } + public int hashCode() { return key.hashCode(); diff --git a/specs/amqp-8.0.xml b/specs/amqp-8.0.xml index 6b3a438f08..6a91263a99 100644 --- a/specs/amqp-8.0.xml +++ b/specs/amqp-8.0.xml @@ -2085,6 +2085,13 @@ localised reply text method it will raise a channel or connection exception. + + + + A set of filters for the consume. The syntax and semantics + of these filters depends on the providers implementation. + + @@ -2446,9 +2453,9 @@ localised reply text A client MUST NOT use this method as a means of selecting messages to process. A rejected message MAY be discarded or dead-lettered, not necessarily passed to another client. - + - + @@ -2490,7 +2497,7 @@ localised reply text The server MUST set the redelivered flag on all messages that are resent. - The server MUST raise a channel exception if this is called on a + The server MUST raise a channel exception if this is called on a transacted channel. @@ -2792,7 +2799,7 @@ localised reply text - + staging identifier @@ -2829,7 +2836,7 @@ localised reply text - + already staged amount @@ -3045,7 +3052,7 @@ localised reply text - + acknowledge multiple messages @@ -3084,7 +3091,7 @@ localised reply text not necessarily passed to another client. - + @@ -3483,7 +3490,7 @@ localised reply text Specifies the routing key name specified when the message was published. - + -- cgit v1.2.1