summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2006-12-06 15:55:10 +0000
committerMartin Ritchie <ritchiem@apache.org>2006-12-06 15:55:10 +0000
commit539571e411b4071a24e0568bc21d01eeae9706c6 (patch)
tree4c171782e0b1fba8d9daf62622a46bed9342158a
parentf5fa80f64c15209f90a05b0c5ce068c71a8acfb8 (diff)
downloadqpid-python-539571e411b4071a24e0568bc21d01eeae9706c6.tar.gz
QPID-21
Broker Pom - Added geronimo jms jar - and javacc plugin for selectors. AMQMessage.java - Added decorator to extract JMS information. AMQQueue.java - Mainly whitespace - added filter argument to session registration Subscription.java - Added hasFilters and isInterested to enable the filtration process SubscriptionFactory.java - new constructor that takes the FieldTable of filters. {Test|Remote}Subscription{Impl}.java - Implementation of Subscription{Factory}.java features. SubscriptionSet.java - White space and added hasFilter/isInterested calls to determine nextSubscriber. AMQSession.java - Create FilterTable for basic.consume. AbstractJMSMessage.java - Updated println to provide further jms header details. URLHelper.java - whitespace URLSyntaxException.java - tidied up output amqp-8.0.xml - augmented consume to have argument field table Server/filter - Filtration classes from ActiveMQ and new FilterManager.java to drive them. message/jms - Decorator implementation MessageDecorator.java - Handles decoration of AMQMessages - currently hardwired but could be used to scan classpath. SelectorTest.java - Simple test to see that selectors work. Not exhaustive. FieldTableInTextMessageTest.java - Test that the Fieldtable (properties)in a TextMessage work as expected. (QPID-9) Reinvestigate wrt QPID-158 git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/jmsselectors@483127 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/pom.xml24
-rw-r--r--java/broker/src/main/grammar/SelectorParser.jj598
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java216
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java100
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java45
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java465
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java170
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java42
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java35
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java67
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java82
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java95
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java30
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java305
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java72
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java263
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java130
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java56
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java99
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/message/MessageDecorator.java25
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java288
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java76
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java29
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java39
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java77
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java52
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java11
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableInTextMessageTest.java160
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java140
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java12
-rw-r--r--java/common/src/main/java/org/apache/qpid/url/URLHelper.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/url/URLSyntaxException.java4
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java10
-rw-r--r--specs/amqp-8.0.xml23
38 files changed, 3780 insertions, 84 deletions
diff --git a/java/broker/pom.xml b/java/broker/pom.xml
index 5a2c699cc1..f6dd171214 100644
--- a/java/broker/pom.xml
+++ b/java/broker/pom.xml
@@ -53,6 +53,10 @@
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-jms_1.1_spec</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-filter-ssl</artifactId>
@@ -81,6 +85,26 @@
<build>
<plugins>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>javacc-maven-plugin</artifactId>
+ <version>2.0</version>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <configuration>
+ <sourceDirectory>${basedir}/src/main/grammar</sourceDirectory>
+ <outputDirectory>${basedir}/target/generated</outputDirectory>
+ <packageName>org.apache.qpid.server.filter.jms.selector</packageName>
+ </configuration>
+ <goals>
+ <goal>javacc</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
diff --git a/java/broker/src/main/grammar/SelectorParser.jj b/java/broker/src/main/grammar/SelectorParser.jj
new file mode 100644
index 0000000000..5553a46e47
--- /dev/null
+++ b/java/broker/src/main/grammar/SelectorParser.jj
@@ -0,0 +1,598 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+ //
+ // Original File from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+ //
+
+// ----------------------------------------------------------------------------
+// OPTIONS
+// ----------------------------------------------------------------------------
+options {
+ STATIC = false;
+ UNICODE_INPUT = true;
+
+ // some performance optimizations
+ OPTIMIZE_TOKEN_MANAGER = true;
+ ERROR_REPORTING = false;
+}
+
+// ----------------------------------------------------------------------------
+// PARSER
+// ----------------------------------------------------------------------------
+
+PARSER_BEGIN(SelectorParser)
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.filter.jms.selector;
+
+import java.io.*;
+import java.util.*;
+
+import javax.jms.InvalidSelectorException;
+
+import org.apache.qpid.server.filter.*;
+
+/**
+ * JMS Selector Parser generated by JavaCC
+ *
+ * Do not edit this .java file directly - it is autogenerated from SelectorParser.jj
+ */
+public class SelectorParser {
+
+ public SelectorParser() {
+ this(new StringReader(""));
+ }
+
+ public BooleanExpression parse(String sql) throws InvalidSelectorException {
+ this.ReInit(new StringReader(sql));
+
+ try {
+ return this.JmsSelector();
+ }
+ catch (Throwable e) {
+ throw (InvalidSelectorException)new InvalidSelectorException(sql).initCause(e);
+ }
+
+ }
+
+ private BooleanExpression asBooleanExpression(Expression value) throws ParseException {
+ if (value instanceof BooleanExpression) {
+ return (BooleanExpression) value;
+ }
+ if (value instanceof PropertyExpression) {
+ return UnaryExpression.createBooleanCast( value );
+ }
+ throw new ParseException("Expression will not result in a boolean value: " + value);
+ }
+
+
+}
+
+PARSER_END(SelectorParser)
+
+// ----------------------------------------------------------------------------
+// Tokens
+// ----------------------------------------------------------------------------
+
+/* White Space */
+SPECIAL_TOKEN :
+{
+ " " | "\t" | "\n" | "\r" | "\f"
+}
+
+/* Comments */
+SKIP:
+{
+ <LINE_COMMENT: "--" (~["\n","\r"])* ("\n"|"\r"|"\r\n") >
+}
+
+SKIP:
+{
+ <BLOCK_COMMENT: "/*" (~["*"])* "*" ("*" | (~["*","/"] (~["*"])* "*"))* "/">
+}
+
+/* Reserved Words */
+TOKEN [IGNORE_CASE] :
+{
+ < NOT : "NOT">
+ | < AND : "AND">
+ | < OR : "OR">
+ | < BETWEEN : "BETWEEN">
+ | < LIKE : "LIKE">
+ | < ESCAPE : "ESCAPE">
+ | < IN : "IN">
+ | < IS : "IS">
+ | < TRUE : "TRUE" >
+ | < FALSE : "FALSE" >
+ | < NULL : "NULL" >
+ | < XPATH : "XPATH" >
+ | < XQUERY : "XQUERY" >
+}
+
+/* Literals */
+TOKEN [IGNORE_CASE] :
+{
+
+ < DECIMAL_LITERAL: ["1"-"9"] (["0"-"9"])* (["l","L"])? >
+ | < HEX_LITERAL: "0" ["x","X"] (["0"-"9","a"-"f","A"-"F"])+ >
+ | < OCTAL_LITERAL: "0" (["0"-"7"])* >
+ | < FLOATING_POINT_LITERAL:
+ (["0"-"9"])+ "." (["0"-"9"])* (<EXPONENT>)? // matches: 5.5 or 5. or 5.5E10 or 5.E10
+ | "." (["0"-"9"])+ (<EXPONENT>)? // matches: .5 or .5E10
+ | (["0"-"9"])+ <EXPONENT> // matches: 5E10
+ >
+ | < #EXPONENT: "E" (["+","-"])? (["0"-"9"])+ >
+ | < STRING_LITERAL: "'" ( ("''") | ~["'"] )* "'" >
+}
+
+TOKEN [IGNORE_CASE] :
+{
+ < ID : ["a"-"z", "_", "$"] (["a"-"z","0"-"9","_", "$"])* >
+}
+
+// ----------------------------------------------------------------------------
+// Grammer
+// ----------------------------------------------------------------------------
+BooleanExpression JmsSelector() :
+{
+ Expression left=null;
+}
+{
+ (
+ left = orExpression()
+ )
+ {
+ return asBooleanExpression(left);
+ }
+
+}
+
+Expression orExpression() :
+{
+ Expression left;
+ Expression right;
+}
+{
+ (
+ left = andExpression()
+ (
+ <OR> right = andExpression()
+ {
+ left = LogicExpression.createOR(asBooleanExpression(left), asBooleanExpression(right));
+ }
+ )*
+ )
+ {
+ return left;
+ }
+
+}
+
+
+Expression andExpression() :
+{
+ Expression left;
+ Expression right;
+}
+{
+ (
+ left = equalityExpression()
+ (
+ <AND> right = equalityExpression()
+ {
+ left = LogicExpression.createAND(asBooleanExpression(left), asBooleanExpression(right));
+ }
+ )*
+ )
+ {
+ return left;
+ }
+}
+
+Expression equalityExpression() :
+{
+ Expression left;
+ Expression right;
+}
+{
+ (
+ left = comparisonExpression()
+ (
+
+ "=" right = comparisonExpression()
+ {
+ left = ComparisonExpression.createEqual(left, right);
+ }
+ |
+ "<>" right = comparisonExpression()
+ {
+ left = ComparisonExpression.createNotEqual(left, right);
+ }
+ |
+ LOOKAHEAD(2)
+ <IS> <NULL>
+ {
+ left = ComparisonExpression.createIsNull(left);
+ }
+ |
+ <IS> <NOT> <NULL>
+ {
+ left = ComparisonExpression.createIsNotNull(left);
+ }
+ )*
+ )
+ {
+ return left;
+ }
+}
+
+Expression comparisonExpression() :
+{
+ Expression left;
+ Expression right;
+ Expression low;
+ Expression high;
+ String t, u;
+ boolean not;
+ ArrayList list;
+}
+{
+ (
+ left = addExpression()
+ (
+
+ ">" right = addExpression()
+ {
+ left = ComparisonExpression.createGreaterThan(left, right);
+ }
+ |
+ ">=" right = addExpression()
+ {
+ left = ComparisonExpression.createGreaterThanEqual(left, right);
+ }
+ |
+ "<" right = addExpression()
+ {
+ left = ComparisonExpression.createLessThan(left, right);
+ }
+ |
+ "<=" right = addExpression()
+ {
+ left = ComparisonExpression.createLessThanEqual(left, right);
+ }
+ |
+ {
+ u=null;
+ }
+ <LIKE> t = stringLitteral()
+ [ <ESCAPE> u = stringLitteral() ]
+ {
+ left = ComparisonExpression.createLike(left, t, u);
+ }
+ |
+ LOOKAHEAD(2)
+ {
+ u=null;
+ }
+ <NOT> <LIKE> t = stringLitteral() [ <ESCAPE> u = stringLitteral() ]
+ {
+ left = ComparisonExpression.createNotLike(left, t, u);
+ }
+ |
+ <BETWEEN> low = addExpression() <AND> high = addExpression()
+ {
+ left = ComparisonExpression.createBetween(left, low, high);
+ }
+ |
+ LOOKAHEAD(2)
+ <NOT> <BETWEEN> low = addExpression() <AND> high = addExpression()
+ {
+ left = ComparisonExpression.createNotBetween(left, low, high);
+ }
+ |
+ <IN>
+ "("
+ t = stringLitteral()
+ {
+ list = new ArrayList();
+ list.add( t );
+ }
+ (
+ ","
+ t = stringLitteral()
+ {
+ list.add( t );
+ }
+
+ )*
+ ")"
+ {
+ left = ComparisonExpression.createInFilter(left, list);
+ }
+ |
+ LOOKAHEAD(2)
+ <NOT> <IN>
+ "("
+ t = stringLitteral()
+ {
+ list = new ArrayList();
+ list.add( t );
+ }
+ (
+ ","
+ t = stringLitteral()
+ {
+ list.add( t );
+ }
+
+ )*
+ ")"
+ {
+ left = ComparisonExpression.createNotInFilter(left, list);
+ }
+
+ )*
+ )
+ {
+ return left;
+ }
+}
+
+Expression addExpression() :
+{
+ Expression left;
+ Expression right;
+}
+{
+ left = multExpr()
+ (
+ LOOKAHEAD( ("+"|"-") multExpr())
+ (
+ "+" right = multExpr()
+ {
+ left = ArithmeticExpression.createPlus(left, right);
+ }
+ |
+ "-" right = multExpr()
+ {
+ left = ArithmeticExpression.createMinus(left, right);
+ }
+ )
+
+ )*
+ {
+ return left;
+ }
+}
+
+Expression multExpr() :
+{
+ Expression left;
+ Expression right;
+}
+{
+ left = unaryExpr()
+ (
+ "*" right = unaryExpr()
+ {
+ left = ArithmeticExpression.createMultiply(left, right);
+ }
+ |
+ "/" right = unaryExpr()
+ {
+ left = ArithmeticExpression.createDivide(left, right);
+ }
+ |
+ "%" right = unaryExpr()
+ {
+ left = ArithmeticExpression.createMod(left, right);
+ }
+
+ )*
+ {
+ return left;
+ }
+}
+
+
+Expression unaryExpr() :
+{
+ String s=null;
+ Expression left=null;
+}
+{
+ (
+ LOOKAHEAD( "+" unaryExpr() )
+ "+" left=unaryExpr()
+ |
+ "-" left=unaryExpr()
+ {
+ left = UnaryExpression.createNegate(left);
+ }
+ |
+ <NOT> left=unaryExpr()
+ {
+ left = UnaryExpression.createNOT( asBooleanExpression(left) );
+ }
+ |
+ <XPATH> s=stringLitteral()
+ {
+ left = UnaryExpression.createXPath( s );
+ }
+ |
+ <XQUERY> s=stringLitteral()
+ {
+ left = UnaryExpression.createXQuery( s );
+ }
+ |
+ left = primaryExpr()
+ )
+ {
+ return left;
+ }
+
+}
+
+Expression primaryExpr() :
+{
+ Expression left=null;
+}
+{
+ (
+ left = literal()
+ |
+ left = variable()
+ |
+ "(" left = orExpression() ")"
+ )
+ {
+ return left;
+ }
+}
+
+
+
+ConstantExpression literal() :
+{
+ Token t;
+ String s;
+ ConstantExpression left=null;
+}
+{
+ (
+ (
+ s = stringLitteral()
+ {
+ left = new ConstantExpression(s);
+ }
+ )
+ |
+ (
+ t = <DECIMAL_LITERAL>
+ {
+ left = ConstantExpression.createFromDecimal(t.image);
+ }
+ )
+ |
+ (
+ t = <HEX_LITERAL>
+ {
+ left = ConstantExpression.createFromHex(t.image);
+ }
+ )
+ |
+ (
+ t = <OCTAL_LITERAL>
+ {
+ left = ConstantExpression.createFromOctal(t.image);
+ }
+ )
+ |
+ (
+ t = <FLOATING_POINT_LITERAL>
+ {
+ left = ConstantExpression.createFloat(t.image);
+ }
+ )
+ |
+ (
+ <TRUE>
+ {
+ left = ConstantExpression.TRUE;
+ }
+ )
+ |
+ (
+ <FALSE>
+ {
+ left = ConstantExpression.FALSE;
+ }
+ )
+ |
+ (
+ <NULL>
+ {
+ left = ConstantExpression.NULL;
+ }
+ )
+ )
+ {
+ return left;
+ }
+}
+
+String stringLitteral() :
+{
+ Token t;
+ StringBuffer rc = new StringBuffer();
+ boolean first=true;
+}
+{
+ t = <STRING_LITERAL>
+ {
+ // Decode the sting value.
+ String image = t.image;
+ for( int i=1; i < image.length()-1; i++ ) {
+ char c = image.charAt(i);
+ if( c == '\'' )
+ i++;
+ rc.append(c);
+ }
+ return rc.toString();
+ }
+}
+
+PropertyExpression variable() :
+{
+ Token t;
+ PropertyExpression left=null;
+}
+{
+ (
+ t = <ID>
+ {
+ left = new PropertyExpression(t.image);
+ }
+ )
+ {
+ return left;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index a6cb4523cf..270f75e86e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -26,6 +26,7 @@ import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.ack.TxAck;
import org.apache.qpid.server.ack.UnacknowledgedMessage;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
@@ -290,7 +291,7 @@ public class AMQChannel
* @throws ConsumerTagNotUniqueException if the tag is not unique
* @throws AMQException if something goes wrong
*/
- public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks) throws AMQException, ConsumerTagNotUniqueException
+ public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks, FieldTable filters) throws AMQException, ConsumerTagNotUniqueException
{
if (tag == null)
{
@@ -301,7 +302,7 @@ public class AMQChannel
throw new ConsumerTagNotUniqueException();
}
- queue.registerProtocolSession(session, _channelId, tag, acks);
+ queue.registerProtocolSession(session, _channelId, tag, acks, filters);
_consumerTag2QueueMap.put(tag, queue);
return tag;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java
new file mode 100644
index 0000000000..0aa5739c1c
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java
@@ -0,0 +1,216 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+
+import org.apache.qpid.server.queue.AMQMessage;
+
+import javax.jms.JMSException;
+
+/**
+ * An expression which performs an operation on two expression values
+ *
+ * @version $Revision$
+ */
+public abstract class ArithmeticExpression extends BinaryExpression {
+
+ protected static final int INTEGER = 1;
+ protected static final int LONG = 2;
+ protected static final int DOUBLE = 3;
+
+ /**
+ * @param left
+ * @param right
+ */
+ public ArithmeticExpression(Expression left, Expression right) {
+ super(left, right);
+ }
+
+ public static Expression createPlus(Expression left, Expression right) {
+ return new ArithmeticExpression(left, right) {
+ protected Object evaluate(Object lvalue, Object rvalue) {
+ if (lvalue instanceof String) {
+ String text = (String) lvalue;
+ String answer = text + rvalue;
+ return answer;
+ }
+ else if (lvalue instanceof Number) {
+ return plus((Number) lvalue, asNumber(rvalue));
+ }
+ throw new RuntimeException("Cannot call plus operation on: " + lvalue + " and: " + rvalue);
+ }
+
+ public String getExpressionSymbol() {
+ return "+";
+ }
+ };
+ }
+
+ public static Expression createMinus(Expression left, Expression right) {
+ return new ArithmeticExpression(left, right) {
+ protected Object evaluate(Object lvalue, Object rvalue) {
+ if (lvalue instanceof Number) {
+ return minus((Number) lvalue, asNumber(rvalue));
+ }
+ throw new RuntimeException("Cannot call minus operation on: " + lvalue + " and: " + rvalue);
+ }
+
+ public String getExpressionSymbol() {
+ return "-";
+ }
+ };
+ }
+
+ public static Expression createMultiply(Expression left, Expression right) {
+ return new ArithmeticExpression(left, right) {
+
+ protected Object evaluate(Object lvalue, Object rvalue) {
+ if (lvalue instanceof Number) {
+ return multiply((Number) lvalue, asNumber(rvalue));
+ }
+ throw new RuntimeException("Cannot call multiply operation on: " + lvalue + " and: " + rvalue);
+ }
+
+ public String getExpressionSymbol() {
+ return "*";
+ }
+ };
+ }
+
+ public static Expression createDivide(Expression left, Expression right) {
+ return new ArithmeticExpression(left, right) {
+
+ protected Object evaluate(Object lvalue, Object rvalue) {
+ if (lvalue instanceof Number) {
+ return divide((Number) lvalue, asNumber(rvalue));
+ }
+ throw new RuntimeException("Cannot call divide operation on: " + lvalue + " and: " + rvalue);
+ }
+
+ public String getExpressionSymbol() {
+ return "/";
+ }
+ };
+ }
+
+ public static Expression createMod(Expression left, Expression right) {
+ return new ArithmeticExpression(left, right) {
+
+ protected Object evaluate(Object lvalue, Object rvalue) {
+ if (lvalue instanceof Number) {
+ return mod((Number) lvalue, asNumber(rvalue));
+ }
+ throw new RuntimeException("Cannot call mod operation on: " + lvalue + " and: " + rvalue);
+ }
+
+ public String getExpressionSymbol() {
+ return "%";
+ }
+ };
+ }
+
+ protected Number plus(Number left, Number right) {
+ switch (numberType(left, right)) {
+ case INTEGER:
+ return new Integer(left.intValue() + right.intValue());
+ case LONG:
+ return new Long(left.longValue() + right.longValue());
+ default:
+ return new Double(left.doubleValue() + right.doubleValue());
+ }
+ }
+
+ protected Number minus(Number left, Number right) {
+ switch (numberType(left, right)) {
+ case INTEGER:
+ return new Integer(left.intValue() - right.intValue());
+ case LONG:
+ return new Long(left.longValue() - right.longValue());
+ default:
+ return new Double(left.doubleValue() - right.doubleValue());
+ }
+ }
+
+ protected Number multiply(Number left, Number right) {
+ switch (numberType(left, right)) {
+ case INTEGER:
+ return new Integer(left.intValue() * right.intValue());
+ case LONG:
+ return new Long(left.longValue() * right.longValue());
+ default:
+ return new Double(left.doubleValue() * right.doubleValue());
+ }
+ }
+
+ protected Number divide(Number left, Number right) {
+ return new Double(left.doubleValue() / right.doubleValue());
+ }
+
+ protected Number mod(Number left, Number right) {
+ return new Double(left.doubleValue() % right.doubleValue());
+ }
+
+ private int numberType(Number left, Number right) {
+ if (isDouble(left) || isDouble(right)) {
+ return DOUBLE;
+ }
+ else if (left instanceof Long || right instanceof Long) {
+ return LONG;
+ }
+ else {
+ return INTEGER;
+ }
+ }
+
+ private boolean isDouble(Number n) {
+ return n instanceof Float || n instanceof Double;
+ }
+
+ protected Number asNumber(Object value) {
+ if (value instanceof Number) {
+ return (Number) value;
+ }
+ else {
+ throw new RuntimeException("Cannot convert value: " + value + " into a number");
+ }
+ }
+
+ public Object evaluate(AMQMessage message) throws JMSException {
+ Object lvalue = left.evaluate(message);
+ if (lvalue == null) {
+ return null;
+ }
+ Object rvalue = right.evaluate(message);
+ if (rvalue == null) {
+ return null;
+ }
+ return evaluate(lvalue, rvalue);
+ }
+
+
+ /**
+ * @param lvalue
+ * @param rvalue
+ * @return
+ */
+ abstract protected Object evaluate(Object lvalue, Object rvalue);
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java
new file mode 100644
index 0000000000..4256ab9189
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java
@@ -0,0 +1,100 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+
+
+/**
+ * An expression which performs an operation on two expression values.
+ *
+ * @version $Revision$
+ */
+abstract public class BinaryExpression implements Expression {
+ protected Expression left;
+ protected Expression right;
+
+ public BinaryExpression(Expression left, Expression right) {
+ this.left = left;
+ this.right = right;
+ }
+
+ public Expression getLeft() {
+ return left;
+ }
+
+ public Expression getRight() {
+ return right;
+ }
+
+
+ /**
+ * @see java.lang.Object#toString()
+ */
+ public String toString() {
+ return "(" + left.toString() + " " + getExpressionSymbol() + " " + right.toString() + ")";
+ }
+
+ /**
+ * TODO: more efficient hashCode()
+ *
+ * @see java.lang.Object#hashCode()
+ */
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ /**
+ * TODO: more efficient hashCode()
+ *
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ public boolean equals(Object o) {
+
+ if (o == null || !this.getClass().equals(o.getClass())) {
+ return false;
+ }
+ return toString().equals(o.toString());
+
+ }
+
+ /**
+ * Returns the symbol that represents this binary expression. For example, addition is
+ * represented by "+"
+ *
+ * @return
+ */
+ abstract public String getExpressionSymbol();
+
+ /**
+ * @param expression
+ */
+ public void setRight(Expression expression) {
+ right = expression;
+ }
+
+ /**
+ * @param expression
+ */
+ public void setLeft(Expression expression) {
+ left = expression;
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java
new file mode 100644
index 0000000000..b66de3fbc5
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java
@@ -0,0 +1,45 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.jms.JMSMessage;
+
+import javax.jms.JMSException;
+
+
+/**
+ * A BooleanExpression is an expression that always
+ * produces a Boolean result.
+ *
+ * @version $Revision$
+ */
+public interface BooleanExpression extends Expression {
+
+ /**
+ * @param message
+ * @return true if the expression evaluates to Boolean.TRUE.
+ * @throws JMSException
+ */
+ public boolean matches(AMQMessage message) throws JMSException;
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java
new file mode 100644
index 0000000000..13d278cf65
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java
@@ -0,0 +1,465 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.jms.JMSMessage;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import javax.jms.JMSException;
+
+/**
+ * A filter performing a comparison of two objects
+ *
+ * @version $Revision$
+ */
+public abstract class ComparisonExpression extends BinaryExpression implements BooleanExpression {
+
+ public static BooleanExpression createBetween(Expression value, Expression left, Expression right) {
+ return LogicExpression.createAND(createGreaterThanEqual(value, left), createLessThanEqual(value, right));
+ }
+
+ public static BooleanExpression createNotBetween(Expression value, Expression left, Expression right) {
+ return LogicExpression.createOR(createLessThan(value, left), createGreaterThan(value, right));
+ }
+
+ static final private HashSet REGEXP_CONTROL_CHARS = new HashSet();
+
+ static {
+ REGEXP_CONTROL_CHARS.add(new Character('.'));
+ REGEXP_CONTROL_CHARS.add(new Character('\\'));
+ REGEXP_CONTROL_CHARS.add(new Character('['));
+ REGEXP_CONTROL_CHARS.add(new Character(']'));
+ REGEXP_CONTROL_CHARS.add(new Character('^'));
+ REGEXP_CONTROL_CHARS.add(new Character('$'));
+ REGEXP_CONTROL_CHARS.add(new Character('?'));
+ REGEXP_CONTROL_CHARS.add(new Character('*'));
+ REGEXP_CONTROL_CHARS.add(new Character('+'));
+ REGEXP_CONTROL_CHARS.add(new Character('{'));
+ REGEXP_CONTROL_CHARS.add(new Character('}'));
+ REGEXP_CONTROL_CHARS.add(new Character('|'));
+ REGEXP_CONTROL_CHARS.add(new Character('('));
+ REGEXP_CONTROL_CHARS.add(new Character(')'));
+ REGEXP_CONTROL_CHARS.add(new Character(':'));
+ REGEXP_CONTROL_CHARS.add(new Character('&'));
+ REGEXP_CONTROL_CHARS.add(new Character('<'));
+ REGEXP_CONTROL_CHARS.add(new Character('>'));
+ REGEXP_CONTROL_CHARS.add(new Character('='));
+ REGEXP_CONTROL_CHARS.add(new Character('!'));
+ }
+
+ static class LikeExpression extends UnaryExpression implements BooleanExpression {
+
+ Pattern likePattern;
+
+ /**
+ * @param right
+ */
+ public LikeExpression(Expression right, String like, int escape) {
+ super(right);
+
+ StringBuffer regexp = new StringBuffer(like.length() * 2);
+ regexp.append("\\A"); // The beginning of the input
+ for (int i = 0; i < like.length(); i++) {
+ char c = like.charAt(i);
+ if (escape == (0xFFFF & c)) {
+ i++;
+ if (i >= like.length()) {
+ // nothing left to escape...
+ break;
+ }
+
+ char t = like.charAt(i);
+ regexp.append("\\x");
+ regexp.append(Integer.toHexString(0xFFFF & t));
+ }
+ else if (c == '%') {
+ regexp.append(".*?"); // Do a non-greedy match
+ }
+ else if (c == '_') {
+ regexp.append("."); // match one
+ }
+ else if (REGEXP_CONTROL_CHARS.contains(new Character(c))) {
+ regexp.append("\\x");
+ regexp.append(Integer.toHexString(0xFFFF & c));
+ }
+ else {
+ regexp.append(c);
+ }
+ }
+ regexp.append("\\z"); // The end of the input
+
+ likePattern = Pattern.compile(regexp.toString(), Pattern.DOTALL);
+ }
+
+ /**
+ * org.apache.activemq.filter.UnaryExpression#getExpressionSymbol()
+ */
+ public String getExpressionSymbol() {
+ return "LIKE";
+ }
+
+ /**
+ * org.apache.activemq.filter.Expression#evaluate(MessageEvaluationContext)
+ */
+ public Object evaluate(AMQMessage message) throws JMSException {
+
+ Object rv = this.getRight().evaluate(message);
+
+ if (rv == null) {
+ return null;
+ }
+
+ if (!(rv instanceof String)) {
+ return Boolean.FALSE;
+ //throw new RuntimeException("LIKE can only operate on String identifiers. LIKE attemped on: '" + rv.getClass());
+ }
+
+ return likePattern.matcher((String) rv).matches() ? Boolean.TRUE : Boolean.FALSE;
+ }
+
+ public boolean matches(AMQMessage message) throws JMSException {
+ Object object = evaluate(message);
+ return object!=null && object==Boolean.TRUE;
+ }
+ }
+
+ public static BooleanExpression createLike(Expression left, String right, String escape) {
+ if (escape != null && escape.length() != 1) {
+ throw new RuntimeException("The ESCAPE string litteral is invalid. It can only be one character. Litteral used: " + escape);
+ }
+ int c = -1;
+ if (escape != null) {
+ c = 0xFFFF & escape.charAt(0);
+ }
+
+ return new LikeExpression(left, right, c);
+ }
+
+ public static BooleanExpression createNotLike(Expression left, String right, String escape) {
+ return UnaryExpression.createNOT(createLike(left, right, escape));
+ }
+
+ public static BooleanExpression createInFilter(Expression left, List elements) {
+
+ if( !(left instanceof PropertyExpression) )
+ throw new RuntimeException("Expected a property for In expression, got: "+left);
+ return UnaryExpression.createInExpression((PropertyExpression)left, elements, false);
+
+ }
+
+ public static BooleanExpression createNotInFilter(Expression left, List elements) {
+
+ if( !(left instanceof PropertyExpression) )
+ throw new RuntimeException("Expected a property for In expression, got: "+left);
+ return UnaryExpression.createInExpression((PropertyExpression)left, elements, true);
+
+ }
+
+ public static BooleanExpression createIsNull(Expression left) {
+ return doCreateEqual(left, ConstantExpression.NULL);
+ }
+
+ public static BooleanExpression createIsNotNull(Expression left) {
+ return UnaryExpression.createNOT(doCreateEqual(left, ConstantExpression.NULL));
+ }
+
+ public static BooleanExpression createNotEqual(Expression left, Expression right) {
+ return UnaryExpression.createNOT(createEqual(left, right));
+ }
+
+ public static BooleanExpression createEqual(Expression left, Expression right) {
+ checkEqualOperand(left);
+ checkEqualOperand(right);
+ checkEqualOperandCompatability(left, right);
+ return doCreateEqual(left, right);
+ }
+
+ private static BooleanExpression doCreateEqual(Expression left, Expression right) {
+ return new ComparisonExpression(left, right) {
+
+ public Object evaluate(AMQMessage message) throws JMSException {
+ Object lv = left.evaluate(message);
+ Object rv = right.evaluate(message);
+
+ // Iff one of the values is null
+ if (lv == null ^ rv == null) {
+ return Boolean.FALSE;
+ }
+ if (lv == rv || lv.equals(rv)) {
+ return Boolean.TRUE;
+ }
+ if( lv instanceof Comparable && rv instanceof Comparable ) {
+ return compare((Comparable)lv, (Comparable)rv);
+ }
+ return Boolean.FALSE;
+ }
+
+ protected boolean asBoolean(int answer) {
+ return answer == 0;
+ }
+
+ public String getExpressionSymbol() {
+ return "=";
+ }
+ };
+ }
+
+ public static BooleanExpression createGreaterThan(final Expression left, final Expression right) {
+ checkLessThanOperand(left);
+ checkLessThanOperand(right);
+ return new ComparisonExpression(left, right) {
+ protected boolean asBoolean(int answer) {
+ return answer > 0;
+ }
+
+ public String getExpressionSymbol() {
+ return ">";
+ }
+ };
+ }
+
+ public static BooleanExpression createGreaterThanEqual(final Expression left, final Expression right) {
+ checkLessThanOperand(left);
+ checkLessThanOperand(right);
+ return new ComparisonExpression(left, right) {
+ protected boolean asBoolean(int answer) {
+ return answer >= 0;
+ }
+
+ public String getExpressionSymbol() {
+ return ">=";
+ }
+ };
+ }
+
+ public static BooleanExpression createLessThan(final Expression left, final Expression right) {
+ checkLessThanOperand(left);
+ checkLessThanOperand(right);
+ return new ComparisonExpression(left, right) {
+
+ protected boolean asBoolean(int answer) {
+ return answer < 0;
+ }
+
+ public String getExpressionSymbol() {
+ return "<";
+ }
+
+ };
+ }
+
+ public static BooleanExpression createLessThanEqual(final Expression left, final Expression right) {
+ checkLessThanOperand(left);
+ checkLessThanOperand(right);
+ return new ComparisonExpression(left, right) {
+
+ protected boolean asBoolean(int answer) {
+ return answer <= 0;
+ }
+
+ public String getExpressionSymbol() {
+ return "<=";
+ }
+ };
+ }
+
+ /**
+ * Only Numeric expressions can be used in >, >=, < or <= expressions.s
+ *
+ * @param expr
+ */
+ public static void checkLessThanOperand(Expression expr ) {
+ if( expr instanceof ConstantExpression ) {
+ Object value = ((ConstantExpression)expr).getValue();
+ if( value instanceof Number )
+ return;
+
+ // Else it's boolean or a String..
+ throw new RuntimeException("Value '"+expr+"' cannot be compared.");
+ }
+ if( expr instanceof BooleanExpression ) {
+ throw new RuntimeException("Value '"+expr+"' cannot be compared.");
+ }
+ }
+
+ /**
+ * Validates that the expression can be used in == or <> expression.
+ * Cannot not be NULL TRUE or FALSE litterals.
+ *
+ * @param expr
+ */
+ public static void checkEqualOperand(Expression expr ) {
+ if( expr instanceof ConstantExpression ) {
+ Object value = ((ConstantExpression)expr).getValue();
+ if( value == null )
+ throw new RuntimeException("'"+expr+"' cannot be compared.");
+ }
+ }
+
+ /**
+ *
+ * @param left
+ * @param right
+ */
+ private static void checkEqualOperandCompatability(Expression left, Expression right) {
+ if( left instanceof ConstantExpression && right instanceof ConstantExpression ) {
+ if( left instanceof BooleanExpression && !(right instanceof BooleanExpression) )
+ throw new RuntimeException("'"+left+"' cannot be compared with '"+right+"'");
+ }
+ }
+
+
+
+ /**
+ * @param left
+ * @param right
+ */
+ public ComparisonExpression(Expression left, Expression right) {
+ super(left, right);
+ }
+
+ public Object evaluate(AMQMessage message) throws JMSException {
+ Comparable lv = (Comparable) left.evaluate(message);
+ if (lv == null) {
+ return null;
+ }
+ Comparable rv = (Comparable) right.evaluate(message);
+ if (rv == null) {
+ return null;
+ }
+ return compare(lv, rv);
+ }
+
+ protected Boolean compare(Comparable lv, Comparable rv) {
+ Class lc = lv.getClass();
+ Class rc = rv.getClass();
+ // If the the objects are not of the same type,
+ // try to convert up to allow the comparison.
+ if (lc != rc) {
+ if (lc == Byte.class) {
+ if (rc == Short.class) {
+ lv = new Short(((Number) lv).shortValue());
+ }
+ else if (rc == Integer.class) {
+ lv = new Integer(((Number) lv).intValue());
+ }
+ else if (rc == Long.class) {
+ lv = new Long(((Number) lv).longValue());
+ }
+ else if (rc == Float.class) {
+ lv = new Float(((Number) lv).floatValue());
+ }
+ else if (rc == Double.class) {
+ lv = new Double(((Number) lv).doubleValue());
+ }
+ else {
+ return Boolean.FALSE;
+ }
+ } else if (lc == Short.class) {
+ if (rc == Integer.class) {
+ lv = new Integer(((Number) lv).intValue());
+ }
+ else if (rc == Long.class) {
+ lv = new Long(((Number) lv).longValue());
+ }
+ else if (rc == Float.class) {
+ lv = new Float(((Number) lv).floatValue());
+ }
+ else if (rc == Double.class) {
+ lv = new Double(((Number) lv).doubleValue());
+ }
+ else {
+ return Boolean.FALSE;
+ }
+ } else if (lc == Integer.class) {
+ if (rc == Long.class) {
+ lv = new Long(((Number) lv).longValue());
+ }
+ else if (rc == Float.class) {
+ lv = new Float(((Number) lv).floatValue());
+ }
+ else if (rc == Double.class) {
+ lv = new Double(((Number) lv).doubleValue());
+ }
+ else {
+ return Boolean.FALSE;
+ }
+ }
+ else if (lc == Long.class) {
+ if (rc == Integer.class) {
+ rv = new Long(((Number) rv).longValue());
+ }
+ else if (rc == Float.class) {
+ lv = new Float(((Number) lv).floatValue());
+ }
+ else if (rc == Double.class) {
+ lv = new Double(((Number) lv).doubleValue());
+ }
+ else {
+ return Boolean.FALSE;
+ }
+ }
+ else if (lc == Float.class) {
+ if (rc == Integer.class) {
+ rv = new Float(((Number) rv).floatValue());
+ }
+ else if (rc == Long.class) {
+ rv = new Float(((Number) rv).floatValue());
+ }
+ else if (rc == Double.class) {
+ lv = new Double(((Number) lv).doubleValue());
+ }
+ else {
+ return Boolean.FALSE;
+ }
+ }
+ else if (lc == Double.class) {
+ if (rc == Integer.class) {
+ rv = new Double(((Number) rv).doubleValue());
+ }
+ else if (rc == Long.class) {
+ rv = new Double(((Number) rv).doubleValue());
+ }
+ else if (rc == Float.class) {
+ rv = new Float(((Number) rv).doubleValue());
+ }
+ else {
+ return Boolean.FALSE;
+ }
+ }
+ else
+ return Boolean.FALSE;
+ }
+ return asBoolean(lv.compareTo(rv)) ? Boolean.TRUE : Boolean.FALSE;
+ }
+
+ protected abstract boolean asBoolean(int answer);
+
+ public boolean matches(AMQMessage message) throws JMSException {
+ Object object = evaluate(message);
+ return object!=null && object==Boolean.TRUE;
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java
new file mode 100644
index 0000000000..9bde712da2
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java
@@ -0,0 +1,170 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.jms.JMSMessage;
+
+import java.math.BigDecimal;
+
+import javax.jms.JMSException;
+
+/**
+ * Represents a constant expression
+ *
+ * @version $Revision$
+ */
+public class ConstantExpression implements Expression {
+
+ static class BooleanConstantExpression extends ConstantExpression implements BooleanExpression {
+ public BooleanConstantExpression(Object value) {
+ super(value);
+ }
+ public boolean matches(AMQMessage message) throws JMSException {
+ Object object = evaluate(message);
+ return object!=null && object==Boolean.TRUE;
+ }
+ }
+
+ public static final BooleanConstantExpression NULL = new BooleanConstantExpression(null);
+ public static final BooleanConstantExpression TRUE = new BooleanConstantExpression(Boolean.TRUE);
+ public static final BooleanConstantExpression FALSE = new BooleanConstantExpression(Boolean.FALSE);
+
+ private Object value;
+
+ public static ConstantExpression createFromDecimal(String text) {
+
+ // Strip off the 'l' or 'L' if needed.
+ if( text.endsWith("l") || text.endsWith("L") )
+ text = text.substring(0, text.length()-1);
+
+ Number value;
+ try {
+ value = new Long(text);
+ } catch ( NumberFormatException e) {
+ // The number may be too big to fit in a long.
+ value = new BigDecimal(text);
+ }
+
+ long l = value.longValue();
+ if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) {
+ value = new Integer(value.intValue());
+ }
+ return new ConstantExpression(value);
+ }
+
+ public static ConstantExpression createFromHex(String text) {
+ Number value = new Long(Long.parseLong(text.substring(2), 16));
+ long l = value.longValue();
+ if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) {
+ value = new Integer(value.intValue());
+ }
+ return new ConstantExpression(value);
+ }
+
+ public static ConstantExpression createFromOctal(String text) {
+ Number value = new Long(Long.parseLong(text, 8));
+ long l = value.longValue();
+ if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) {
+ value = new Integer(value.intValue());
+ }
+ return new ConstantExpression(value);
+ }
+
+ public static ConstantExpression createFloat(String text) {
+ Number value = new Double(text);
+ return new ConstantExpression(value);
+ }
+
+ public ConstantExpression(Object value) {
+ this.value = value;
+ }
+
+ public Object evaluate(AMQMessage message) throws JMSException {
+ return value;
+ }
+
+ public Object getValue() {
+ return value;
+ }
+
+ /**
+ * @see java.lang.Object#toString()
+ */
+ public String toString() {
+ if (value == null) {
+ return "NULL";
+ }
+ if (value instanceof Boolean) {
+ return ((Boolean) value).booleanValue() ? "TRUE" : "FALSE";
+ }
+ if (value instanceof String) {
+ return encodeString((String) value);
+ }
+ return value.toString();
+ }
+
+ /**
+ * TODO: more efficient hashCode()
+ *
+ * @see java.lang.Object#hashCode()
+ */
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ /**
+ * TODO: more efficient hashCode()
+ *
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ public boolean equals(Object o) {
+
+ if (o == null || !this.getClass().equals(o.getClass())) {
+ return false;
+ }
+ return toString().equals(o.toString());
+
+ }
+
+
+ /**
+ * Encodes the value of string so that it looks like it would look like
+ * when it was provided in a selector.
+ *
+ * @param s
+ * @return
+ */
+ public static String encodeString(String s) {
+ StringBuffer b = new StringBuffer();
+ b.append('\'');
+ for (int i = 0; i < s.length(); i++) {
+ char c = s.charAt(i);
+ if (c == '\'') {
+ b.append(c);
+ }
+ b.append(c);
+ }
+ b.append('\'');
+ return b.toString();
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java
new file mode 100644
index 0000000000..a15c15fb91
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java
@@ -0,0 +1,42 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.jms.JMSMessage;
+
+import javax.jms.JMSException;
+
+
+/**
+ * Represents an expression
+ *
+ * @version $Revision$
+ */
+public interface Expression {
+
+ /**
+ * @return the value of this expression
+ */
+ public Object evaluate(AMQMessage message) throws JMSException;
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java
new file mode 100644
index 0000000000..e700bdfe55
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import org.apache.qpid.server.queue.AMQMessage;
+
+public interface FilterManager
+{
+ void add(MessageFilter filter);
+
+ void remove(MessageFilter filter);
+
+ boolean allAllow(AMQMessage msg);
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
new file mode 100644
index 0000000000..5c106de6f2
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.filter;
+
+import org.apache.qpid.framing.FieldTable;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+
+
+public class FilterManagerFactory
+{
+ //private final static Logger _logger = LoggerFactory.getLogger(FilterManagerFactory.class);
+ private final static org.apache.log4j.Logger _logger = org.apache.log4j.Logger.getLogger(FilterManagerFactory.class);
+
+ //fixme move to a common class so it can be refered to from client code.
+ private static String JMS_SELECTOR_FILTER = "x-filter-jms-selector";
+
+ public static FilterManager createManager(FieldTable filters)
+ {
+ FilterManager manager = null;
+
+ if (filters != null)
+ {
+
+ manager = new SimpleFilterManager();
+
+ Iterator it = filters.keySet().iterator();
+ _logger.info("Processing filters:");
+ while (it.hasNext())
+ {
+ String key = (String) it.next();
+ _logger.info("filter:" + key);
+ if (key.equals(JMS_SELECTOR_FILTER))
+ {
+ manager.add(new JMSSelectorFilter((String) filters.get(key)));
+ }
+
+ }
+ }
+ else
+ {
+ _logger.info("No Filters found.");
+ }
+
+ return manager;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java b/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
new file mode 100644
index 0000000000..e54fdb944d
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.filter;
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.filter.jms.selector.SelectorParser;
+import org.apache.qpid.server.message.jms.JMSMessage;
+import org.apache.log4j.Logger;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+
+public class JMSSelectorFilter implements MessageFilter
+{
+
+ private final static Logger _logger = org.apache.log4j.Logger.getLogger(JMSSelectorFilter.class);
+// LoggerFactory.getLogger(JMSSelectorFilter.class);
+
+ private String _selector;
+ private BooleanExpression _matcher;
+
+ public JMSSelectorFilter(String selector)
+ {
+ _selector = selector;
+ _logger.info("Created JMSSelectorFilter with selector:" + _selector);
+
+ // BooleanExpression activemqSelctor = new ActiveMQSelectorParser(selector);
+
+ try
+ {
+ _matcher = new SelectorParser().parse(selector);
+ }
+ catch (InvalidSelectorException e)
+ {
+ // fixme
+ // Will have to throw this back to the client... in the future
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+
+ }
+
+ public boolean matches(AMQMessage message)
+ {
+ try
+ {
+ boolean match = _matcher.matches(message);
+ _logger.info(message + " match(" + match + ") selector:" + _selector);
+ return match;
+ }
+ catch (JMSException e)
+ {
+ //fixme this needs to be sorted.. it shouldn't happen
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ return false;
+ }
+
+ public String getSelector()
+ {
+ return _selector;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java
new file mode 100644
index 0000000000..714d8c23f5
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java
@@ -0,0 +1,95 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.jms.JMSMessage;
+
+import javax.jms.JMSException;
+
+/**
+ * A filter performing a comparison of two objects
+ *
+ * @version $Revision$
+ */
+public abstract class LogicExpression extends BinaryExpression implements BooleanExpression {
+
+ public static BooleanExpression createOR(BooleanExpression lvalue, BooleanExpression rvalue) {
+ return new LogicExpression(lvalue, rvalue) {
+
+ public Object evaluate(AMQMessage message) throws JMSException {
+
+ Boolean lv = (Boolean) left.evaluate(message);
+ // Can we do an OR shortcut??
+ if (lv !=null && lv.booleanValue()) {
+ return Boolean.TRUE;
+ }
+
+ Boolean rv = (Boolean) right.evaluate(message);
+ return rv==null ? null : rv;
+ }
+
+ public String getExpressionSymbol() {
+ return "OR";
+ }
+ };
+ }
+
+ public static BooleanExpression createAND(BooleanExpression lvalue, BooleanExpression rvalue) {
+ return new LogicExpression(lvalue, rvalue) {
+
+ public Object evaluate(AMQMessage message) throws JMSException {
+
+ Boolean lv = (Boolean) left.evaluate(message);
+
+ // Can we do an AND shortcut??
+ if (lv == null)
+ return null;
+ if (!lv.booleanValue()) {
+ return Boolean.FALSE;
+ }
+
+ Boolean rv = (Boolean) right.evaluate(message);
+ return rv == null ? null : rv;
+ }
+
+ public String getExpressionSymbol() {
+ return "AND";
+ }
+ };
+ }
+
+ /**
+ * @param left
+ * @param right
+ */
+ public LogicExpression(BooleanExpression left, BooleanExpression right) {
+ super(left, right);
+ }
+
+ abstract public Object evaluate(AMQMessage message) throws JMSException;
+
+ public boolean matches(AMQMessage message) throws JMSException {
+ Object object = evaluate(message);
+ return object!=null && object==Boolean.TRUE;
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java b/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
new file mode 100644
index 0000000000..b8ca75d209
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.filter;
+
+import org.apache.qpid.server.queue.AMQMessage;
+
+import javax.jms.JMSException;
+
+public interface MessageFilter
+{
+ boolean matches(AMQMessage message) throws JMSException;
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
new file mode 100644
index 0000000000..f3e9965c2e
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
@@ -0,0 +1,305 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+
+//import org.apache.activemq.command.ActiveMQDestination;
+//import org.apache.activemq.command.Message;
+//import org.apache.activemq.command.TransactionId;
+//import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.jms.JMSMessage;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.log4j.Logger;
+
+/**
+ * Represents a property expression
+ *
+ * @version $Revision$
+ */
+public class PropertyExpression implements Expression
+{
+
+ interface SubExpression
+ {
+ public Object evaluate(AMQMessage message);
+ }
+
+ interface JMSExpression
+ {
+ public abstract Object evaluate(JMSMessage message);
+ }
+
+ static class SubJMSExpression implements SubExpression
+ {
+ JMSExpression _expression;
+
+ SubJMSExpression(JMSExpression expression)
+ {
+ _expression = expression;
+ }
+
+
+ public Object evaluate(AMQMessage message)
+ {
+ JMSMessage msg = (JMSMessage) message.getDecodedMessage(AMQMessage.JMS_MESSAGE);
+ if (msg != null)
+ {
+ return _expression.evaluate(msg);
+ }
+ else
+ {
+ return null;
+ }
+ }
+ }
+
+ private final static Logger _logger = org.apache.log4j.Logger.getLogger(PropertyExpression.class);
+
+
+ static final private HashMap JMS_PROPERTY_EXPRESSIONS = new HashMap();
+
+ static
+ {
+ JMS_PROPERTY_EXPRESSIONS.put("JMSDestination", new SubJMSExpression(
+ new JMSExpression()
+ {
+ public Object evaluate(JMSMessage message)
+ {
+ return message.getJMSDestination();
+ }
+ }
+ ));
+//
+// public Object evaluate(AMQMessage message)
+// {
+// //fixme
+//
+//
+//// AMQDestination dest = message.getOriginalDestination();
+//// if (dest == null)
+//// {
+//// dest = message.getDestination();
+//// }
+//// if (dest == null)
+//// {
+//// return null;
+//// }
+//// return dest.toString();
+// return "";
+// }
+// });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSReplyTo", new SubJMSExpression(
+ new JMSExpression()
+ {
+ public Object evaluate(JMSMessage message)
+ {
+ return message.getJMSReplyTo();
+ }
+ })
+ );
+
+ JMS_PROPERTY_EXPRESSIONS.put("JMSType", new SubJMSExpression(
+ new JMSExpression()
+ {
+ public Object evaluate(JMSMessage message)
+ {
+ return message.getJMSType();
+ }
+ }
+ ));
+
+ JMS_PROPERTY_EXPRESSIONS.put("JMSDeliveryMode", new SubJMSExpression(
+ new JMSExpression()
+ {
+ public Object evaluate(JMSMessage message)
+ {
+ try
+ {
+ Integer mode = new Integer(message.getAMQMessage().isPersistent() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+ _logger.info("JMSDeliveryMode is :" + mode);
+ return mode;
+ }
+ catch (AMQException e)
+ {
+ //shouldn't happen
+ }
+
+ return DeliveryMode.NON_PERSISTENT;
+ }
+ }));
+
+ JMS_PROPERTY_EXPRESSIONS.put("JMSPriority", new SubJMSExpression(
+ new JMSExpression()
+ {
+ public Object evaluate(JMSMessage message)
+ {
+ return message.getJMSPriority();
+ }
+ }
+ ));
+
+
+ JMS_PROPERTY_EXPRESSIONS.put("AMQMessageID", new SubJMSExpression(
+ new JMSExpression()
+ {
+ public Object evaluate(JMSMessage message)
+ {
+ return message.getAMQMessage().getMessageId();
+ }
+ }
+ ));
+
+ JMS_PROPERTY_EXPRESSIONS.put("JMSTimestamp", new SubJMSExpression(
+ new JMSExpression()
+ {
+ public Object evaluate(JMSMessage message)
+ {
+ return message.getJMSTimestamp();
+ }
+ }
+ ));
+
+ JMS_PROPERTY_EXPRESSIONS.put("JMSCorrelationID", new SubJMSExpression(
+ new JMSExpression()
+ {
+ public Object evaluate(JMSMessage message)
+ {
+ return message.getJMSCorrelationID();
+ }
+ }
+ ));
+
+ JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration", new SubJMSExpression(
+ new JMSExpression()
+ {
+ public Object evaluate(JMSMessage message)
+ {
+ return message.getJMSExpiration();
+ }
+ }
+ ));
+
+ JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new SubJMSExpression(
+ new JMSExpression()
+ {
+ public Object evaluate(JMSMessage message)
+ {
+ return message.getAMQMessage().isRedelivered();
+ }
+ }
+ ));
+
+ }
+
+ private final String name;
+ private final SubExpression jmsPropertyExpression;
+
+ public PropertyExpression(String name)
+ {
+ this.name = name;
+ jmsPropertyExpression = (SubExpression) JMS_PROPERTY_EXPRESSIONS.get(name);
+ }
+
+ public Object evaluate(AMQMessage message) throws JMSException
+ {
+// try
+// {
+// if (message.isDropped())
+// {
+// return null;
+// }
+
+ if (jmsPropertyExpression != null)
+ {
+ return jmsPropertyExpression.evaluate(message);
+ }
+// try
+ else
+ {
+
+ BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties;
+
+ _logger.info("Looking up property:" + name);
+ _logger.info("Properties are:" + _properties.getHeaders().keySet());
+
+ return _properties.getHeaders().get(name);
+ }
+// catch (IOException ioe)
+// {
+// JMSException exception = new JMSException("Could not get property: " + name + " reason: " + ioe.getMessage());
+// exception.initCause(ioe);
+// throw exception;
+// }
+// }
+// catch (IOException e)
+// {
+// JMSException exception = new JMSException(e.getMessage());
+// exception.initCause(e);
+// throw exception;
+// }
+
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+
+ /**
+ * @see java.lang.Object#toString()
+ */
+ public String toString()
+ {
+ return name;
+ }
+
+ /**
+ * @see java.lang.Object#hashCode()
+ */
+ public int hashCode()
+ {
+ return name.hashCode();
+ }
+
+ /**
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ public boolean equals(Object o)
+ {
+
+ if (o == null || !this.getClass().equals(o.getClass()))
+ {
+ return false;
+ }
+ return name.equals(((PropertyExpression) o).name);
+
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java b/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
new file mode 100644
index 0000000000..1eba0b9fed
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.filter;
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class SimpleFilterManager implements FilterManager
+{
+ private final Logger _logger = LoggerFactory.getLogger(SimpleFilterManager.class);
+
+ private final ConcurrentLinkedQueue<MessageFilter> _filters;
+
+ public SimpleFilterManager()
+ {
+ _logger.debug("Creating SimpleFilterManager");
+ _filters = new ConcurrentLinkedQueue<MessageFilter>();
+ }
+
+ public void add(MessageFilter filter)
+ {
+ _filters.add(filter);
+ }
+
+ public void remove(MessageFilter filter)
+ {
+ _filters.remove(filter);
+ }
+
+ public boolean allAllow(AMQMessage msg)
+ {
+ for (MessageFilter filter : _filters)
+ {
+ try
+ {
+ if (!filter.matches(msg))
+ {
+ return false;
+ }
+ }
+ catch (JMSException e)
+ {
+ //fixme
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java
new file mode 100644
index 0000000000..49ff147411
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java
@@ -0,0 +1,263 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.jms.JMSMessage;
+
+import java.math.BigDecimal;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.jms.JMSException;
+
+/**
+ * An expression which performs an operation on two expression values
+ *
+ * @version $Revision$
+ */
+public abstract class UnaryExpression implements Expression {
+
+ private static final BigDecimal BD_LONG_MIN_VALUE = BigDecimal.valueOf(Long.MIN_VALUE);
+ protected Expression right;
+
+ public static Expression createNegate(Expression left) {
+ return new UnaryExpression(left) {
+ public Object evaluate(AMQMessage message) throws JMSException {
+ Object rvalue = right.evaluate(message);
+ if (rvalue == null) {
+ return null;
+ }
+ if (rvalue instanceof Number) {
+ return negate((Number) rvalue);
+ }
+ return null;
+ }
+
+ public String getExpressionSymbol() {
+ return "-";
+ }
+ };
+ }
+
+ public static BooleanExpression createInExpression(PropertyExpression right, List elements, final boolean not) {
+
+ // Use a HashSet if there are many elements.
+ Collection t;
+ if( elements.size()==0 )
+ t=null;
+ else if( elements.size() < 5 )
+ t = elements;
+ else {
+ t = new HashSet(elements);
+ }
+ final Collection inList = t;
+
+ return new BooleanUnaryExpression(right) {
+ public Object evaluate(AMQMessage message) throws JMSException {
+
+ Object rvalue = right.evaluate(message);
+ if (rvalue == null) {
+ return null;
+ }
+ if( rvalue.getClass()!=String.class )
+ return null;
+
+ if( (inList!=null && inList.contains(rvalue)) ^ not ) {
+ return Boolean.TRUE;
+ } else {
+ return Boolean.FALSE;
+ }
+
+ }
+
+ public String toString() {
+ StringBuffer answer = new StringBuffer();
+ answer.append(right);
+ answer.append(" ");
+ answer.append(getExpressionSymbol());
+ answer.append(" ( ");
+
+ int count=0;
+ for (Iterator i = inList.iterator(); i.hasNext();) {
+ Object o = (Object) i.next();
+ if( count!=0 ) {
+ answer.append(", ");
+ }
+ answer.append(o);
+ count++;
+ }
+
+ answer.append(" )");
+ return answer.toString();
+ }
+
+ public String getExpressionSymbol() {
+ if( not )
+ return "NOT IN";
+ else
+ return "IN";
+ }
+ };
+ }
+
+ abstract static class BooleanUnaryExpression extends UnaryExpression implements BooleanExpression {
+ public BooleanUnaryExpression(Expression left) {
+ super(left);
+ }
+
+ public boolean matches(AMQMessage message) throws JMSException {
+ Object object = evaluate(message);
+ return object!=null && object==Boolean.TRUE;
+ }
+ };
+
+
+ public static BooleanExpression createNOT(BooleanExpression left) {
+ return new BooleanUnaryExpression(left) {
+ public Object evaluate(AMQMessage message) throws JMSException {
+ Boolean lvalue = (Boolean) right.evaluate(message);
+ if (lvalue == null) {
+ return null;
+ }
+ return lvalue.booleanValue() ? Boolean.FALSE : Boolean.TRUE;
+ }
+
+ public String getExpressionSymbol() {
+ return "NOT";
+ }
+ };
+ }
+
+ public static BooleanExpression createXPath(final String xpath) {
+ return new XPathExpression(xpath);
+ }
+
+ public static BooleanExpression createXQuery(final String xpath) {
+ return new XQueryExpression(xpath);
+ }
+
+ public static BooleanExpression createBooleanCast(Expression left) {
+ return new BooleanUnaryExpression(left) {
+ public Object evaluate(AMQMessage message) throws JMSException {
+ Object rvalue = right.evaluate(message);
+ if (rvalue == null)
+ return null;
+ if (!rvalue.getClass().equals(Boolean.class))
+ return Boolean.FALSE;
+ return ((Boolean)rvalue).booleanValue() ? Boolean.TRUE : Boolean.FALSE;
+ }
+
+ public String toString() {
+ return right.toString();
+ }
+
+ public String getExpressionSymbol() {
+ return "";
+ }
+ };
+ }
+
+ private static Number negate(Number left) {
+ Class clazz = left.getClass();
+ if (clazz == Integer.class) {
+ return new Integer(-left.intValue());
+ }
+ else if (clazz == Long.class) {
+ return new Long(-left.longValue());
+ }
+ else if (clazz == Float.class) {
+ return new Float(-left.floatValue());
+ }
+ else if (clazz == Double.class) {
+ return new Double(-left.doubleValue());
+ }
+ else if (clazz == BigDecimal.class) {
+ // We ussually get a big deciamal when we have Long.MIN_VALUE constant in the
+ // Selector. Long.MIN_VALUE is too big to store in a Long as a positive so we store it
+ // as a Big decimal. But it gets Negated right away.. to here we try to covert it back
+ // to a Long.
+ BigDecimal bd = (BigDecimal)left;
+ bd = bd.negate();
+
+ if( BD_LONG_MIN_VALUE.compareTo(bd)==0 ) {
+ return new Long(Long.MIN_VALUE);
+ }
+ return bd;
+ }
+ else {
+ throw new RuntimeException("Don't know how to negate: "+left);
+ }
+ }
+
+ public UnaryExpression(Expression left) {
+ this.right = left;
+ }
+
+ public Expression getRight() {
+ return right;
+ }
+
+ public void setRight(Expression expression) {
+ right = expression;
+ }
+
+ /**
+ * @see java.lang.Object#toString()
+ */
+ public String toString() {
+ return "(" + getExpressionSymbol() + " " + right.toString() + ")";
+ }
+
+ /**
+ * TODO: more efficient hashCode()
+ *
+ * @see java.lang.Object#hashCode()
+ */
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ /**
+ * TODO: more efficient hashCode()
+ *
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ public boolean equals(Object o) {
+
+ if (o == null || !this.getClass().equals(o.getClass())) {
+ return false;
+ }
+ return toString().equals(o.toString());
+
+ }
+
+ /**
+ * Returns the symbol that represents this binary expression. For example, addition is
+ * represented by "+"
+ *
+ * @return
+ */
+ abstract public String getExpressionSymbol();
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
new file mode 100644
index 0000000000..ab952b6fea
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
@@ -0,0 +1,130 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+import javax.jms.JMSException;
+
+//import org.apache.activemq.command.Message;
+//import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.qpid.server.queue.AMQMessage;
+
+/**
+ * Used to evaluate an XPath Expression in a JMS selector.
+ */
+public final class XPathExpression implements BooleanExpression {
+
+ private static final Log log = LogFactory.getLog(XPathExpression.class);
+ private static final String EVALUATOR_SYSTEM_PROPERTY = "org.apache.qpid.server.filter.XPathEvaluatorClassName";
+ private static final String DEFAULT_EVALUATOR_CLASS_NAME=XalanXPathEvaluator.class.getName();
+
+ private static final Constructor EVALUATOR_CONSTRUCTOR;
+
+ static {
+ String cn = System.getProperty(EVALUATOR_SYSTEM_PROPERTY, DEFAULT_EVALUATOR_CLASS_NAME);
+ Constructor m = null;
+ try {
+ try {
+ m = getXPathEvaluatorConstructor(cn);
+ } catch (Throwable e) {
+ log.warn("Invalid "+XPathEvaluator.class.getName()+" implementation: "+cn+", reason: "+e,e);
+ cn = DEFAULT_EVALUATOR_CLASS_NAME;
+ try {
+ m = getXPathEvaluatorConstructor(cn);
+ } catch (Throwable e2) {
+ log.error("Default XPath evaluator could not be loaded",e);
+ }
+ }
+ } finally {
+ EVALUATOR_CONSTRUCTOR = m;
+ }
+ }
+
+ private static Constructor getXPathEvaluatorConstructor(String cn) throws ClassNotFoundException, SecurityException, NoSuchMethodException {
+ Class c = XPathExpression.class.getClassLoader().loadClass(cn);
+ if( !XPathEvaluator.class.isAssignableFrom(c) ) {
+ throw new ClassCastException(""+c+" is not an instance of "+XPathEvaluator.class);
+ }
+ return c.getConstructor(new Class[]{String.class});
+ }
+
+ private final String xpath;
+ private final XPathEvaluator evaluator;
+
+ static public interface XPathEvaluator {
+ public boolean evaluate(AMQMessage message) throws JMSException;
+ }
+
+ XPathExpression(String xpath) {
+ this.xpath = xpath;
+ this.evaluator = createEvaluator(xpath);
+ }
+
+ private XPathEvaluator createEvaluator(String xpath2) {
+ try {
+ return (XPathEvaluator)EVALUATOR_CONSTRUCTOR.newInstance(new Object[]{xpath});
+ } catch (InvocationTargetException e) {
+ Throwable cause = e.getCause();
+ if( cause instanceof RuntimeException ) {
+ throw (RuntimeException)cause;
+ }
+ throw new RuntimeException("Invalid XPath Expression: "+xpath+" reason: "+e.getMessage(), e);
+ } catch (Throwable e) {
+ throw new RuntimeException("Invalid XPath Expression: "+xpath+" reason: "+e.getMessage(), e);
+ }
+ }
+
+ public Object evaluate(AMQMessage message) throws JMSException {
+// try {
+//FIXME this is flow to disk work
+// if( message.isDropped() )
+// return null;
+ return evaluator.evaluate(message) ? Boolean.TRUE : Boolean.FALSE;
+// } catch (IOException e) {
+//
+// JMSException exception = new JMSException(e.getMessage());
+// exception.initCause(e);
+// throw exception;
+//
+// }
+
+ }
+
+ public String toString() {
+ return "XPATH "+ConstantExpression.encodeString(xpath);
+ }
+
+ /**
+ * @param message
+ * @return true if the expression evaluates to Boolean.TRUE.
+ * @throws JMSException
+ */
+ public boolean matches(AMQMessage message) throws JMSException {
+ Object object = evaluate(message);
+ return object!=null && object==Boolean.TRUE;
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
new file mode 100644
index 0000000000..53764cbf75
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
@@ -0,0 +1,56 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.filter;
+
+import org.apache.qpid.server.queue.AMQMessage;
+
+import javax.jms.JMSException;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+/**
+ * Used to evaluate an XQuery Expression in a JMS selector.
+ */
+public final class XQueryExpression implements BooleanExpression {
+ private final String xpath;
+
+ XQueryExpression(String xpath) {
+ super();
+ this.xpath = xpath;
+ }
+
+ public Object evaluate(AMQMessage message) throws JMSException {
+ return Boolean.FALSE;
+ }
+
+ public String toString() {
+ return "XQUERY "+ConstantExpression.encodeString(xpath);
+ }
+
+ /**
+ * @param message
+ * @return true if the expression evaluates to Boolean.TRUE.
+ * @throws JMSException
+ */
+ public boolean matches(AMQMessage message) throws JMSException {
+ Object object = evaluate(message);
+ return object!=null && object==Boolean.TRUE;
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java b/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
new file mode 100644
index 0000000000..4b78fd18df
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
@@ -0,0 +1,99 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import java.io.StringReader;
+import java.io.ByteArrayInputStream;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+
+//import org.apache.activemq.command.Message;
+//import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.xpath.CachedXPathAPI;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.w3c.dom.Document;
+import org.w3c.dom.traversal.NodeIterator;
+import org.xml.sax.InputSource;
+
+public class XalanXPathEvaluator implements XPathExpression.XPathEvaluator {
+
+ private final String xpath;
+
+ public XalanXPathEvaluator(String xpath) {
+ this.xpath = xpath;
+ }
+
+ public boolean evaluate(AMQMessage m) throws JMSException {
+ if( m instanceof TextMessage ) {
+ String text = ((TextMessage)m).getText();
+ return evaluate(text);
+ } else if ( m instanceof BytesMessage ) {
+ BytesMessage bm = (BytesMessage) m;
+ byte data[] = new byte[(int) bm.getBodyLength()];
+ bm.readBytes(data);
+ return evaluate(data);
+ }
+ return false;
+ }
+
+ private boolean evaluate(byte[] data) {
+ try {
+
+ InputSource inputSource = new InputSource(new ByteArrayInputStream(data));
+
+ DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+ factory.setNamespaceAware(true);
+ DocumentBuilder dbuilder = factory.newDocumentBuilder();
+ Document doc = dbuilder.parse(inputSource);
+
+ CachedXPathAPI cachedXPathAPI = new CachedXPathAPI();
+ NodeIterator iterator = cachedXPathAPI.selectNodeIterator(doc,xpath);
+ return iterator.nextNode()!=null;
+
+ } catch (Throwable e) {
+ return false;
+ }
+ }
+
+ private boolean evaluate(String text) {
+ try {
+ InputSource inputSource = new InputSource(new StringReader(text));
+
+ DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+ factory.setNamespaceAware(true);
+ DocumentBuilder dbuilder = factory.newDocumentBuilder();
+ Document doc = dbuilder.parse(inputSource);
+
+ // We should associated the cachedXPathAPI object with the message being evaluated
+ // since that should speedup subsequent xpath expressions.
+ CachedXPathAPI cachedXPathAPI = new CachedXPathAPI();
+ NodeIterator iterator = cachedXPathAPI.selectNodeIterator(doc,xpath);
+ return iterator.nextNode()!=null;
+ } catch (Throwable e) {
+ return false;
+ }
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
index d4c94061a0..c4c995540d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
@@ -74,7 +74,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
}
try
{
- String consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck);
+ String consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck, body.filter);
if(!body.nowait)
{
session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, consumerTag));
diff --git a/java/broker/src/main/java/org/apache/qpid/server/message/MessageDecorator.java b/java/broker/src/main/java/org/apache/qpid/server/message/MessageDecorator.java
new file mode 100644
index 0000000000..aba3b88a59
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/message/MessageDecorator.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.message;
+
+public interface MessageDecorator
+{
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java b/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java
new file mode 100644
index 0000000000..8cfc3e5b6c
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.message.jms;
+
+import org.apache.qpid.server.message.MessageDecorator;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+import javax.jms.Message;
+import javax.jms.JMSException;
+import javax.jms.Destination;
+import javax.jms.MessageNotWriteableException;
+import java.util.Enumeration;
+
+public class JMSMessage implements MessageDecorator
+{
+
+ private AMQMessage _message;
+ private BasicContentHeaderProperties _properties;
+ private Destination _replyTo;
+
+ public JMSMessage(AMQMessage message)
+ {
+ _message = message;
+ ContentHeaderBody contentHeader = message.getContentHeaderBody();
+ _properties = (BasicContentHeaderProperties) contentHeader.properties;
+ }
+
+ protected void checkWriteable()
+ {
+// The broker should not modify a message.
+// if (_readableMessage)
+// {
+// throw new MessageNotWriteableException("You need to call clearBody() to make the message writable");
+// }
+ }
+
+
+ public String getJMSMessageID()
+ {
+ return _properties.getMessageId();
+ }
+
+ public void setJMSMessageID(String string)
+ {
+ checkWriteable();
+ }
+
+ public long getJMSTimestamp()
+ {
+ return _properties.getTimestamp();
+ }
+
+ public void setJMSTimestamp(long l)
+ {
+ checkWriteable();
+ }
+
+ public byte[] getJMSCorrelationIDAsBytes()
+ {
+ return _properties.getCorrelationId().getBytes();
+ }
+
+ public void setJMSCorrelationIDAsBytes(byte[] bytes)
+ {
+ checkWriteable();
+ }
+
+ public void setJMSCorrelationID(String string)
+ {
+ checkWriteable();
+ }
+
+ public String getJMSCorrelationID()
+ {
+ return _properties.getCorrelationId();
+ }
+
+ public String getJMSReplyTo()
+ {
+ return _properties.getReplyTo();
+ }
+
+ public void setJMSReplyTo(Destination destination)
+ {
+ checkWriteable();
+ }
+
+ public String getJMSDestination()
+ {
+ //FIXME Currently the Destination has not been defined.
+ return "";
+ }
+
+ public void setJMSDestination(Destination destination)
+ {
+ checkWriteable();
+ }
+
+ public int getJMSDeliveryMode()
+ {
+ return _properties.getDeliveryMode();
+ }
+
+ public void setJMSDeliveryMode(int i)
+ {
+ checkWriteable();
+ }
+
+ public boolean getJMSRedelivered()
+ {
+ return _message.isRedelivered();
+ }
+
+ public void setJMSRedelivered(boolean b)
+ {
+ checkWriteable();
+ }
+
+ public String getJMSType()
+ {
+ return _properties.getType();
+ }
+
+ public void setJMSType(String string)
+ {
+ checkWriteable();
+ }
+
+ public long getJMSExpiration()
+ {
+ return _properties.getExpiration();
+ }
+
+ public void setJMSExpiration(long l)
+ {
+ checkWriteable();
+ }
+
+ public int getJMSPriority()
+ {
+ return _properties.getPriority();
+ }
+
+ public void setJMSPriority(int i)
+ {
+ checkWriteable();
+ }
+
+ public void clearProperties()
+ {
+ checkWriteable();
+ }
+
+ public boolean propertyExists(String string)
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean getBooleanProperty(String string)
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public byte getByteProperty(String string)
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public short getShortProperty(String string)
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public int getIntProperty(String string)
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getLongProperty(String string)
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public float getFloatProperty(String string)
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public double getDoubleProperty(String string)
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public String getStringProperty(String string)
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Object getObjectProperty(String string)
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Enumeration getPropertyNames()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setBooleanProperty(String string, boolean b)
+ {
+ checkWriteable();
+ }
+
+ public void setByteProperty(String string, byte b)
+ {
+ checkWriteable();
+ }
+
+ public void setShortProperty(String string, short i)
+ {
+ checkWriteable();
+ }
+
+ public void setIntProperty(String string, int i)
+ {
+ checkWriteable();
+ }
+
+ public void setLongProperty(String string, long l)
+ {
+ checkWriteable();
+ }
+
+ public void setFloatProperty(String string, float v)
+ {
+ checkWriteable();
+ }
+
+ public void setDoubleProperty(String string, double v)
+ {
+ checkWriteable();
+ }
+
+ public void setStringProperty(String string, String string1)
+ {
+ checkWriteable();
+ }
+
+ public void setObjectProperty(String string, Object object)
+ {
+ checkWriteable();
+ }
+
+ public void acknowledge()
+ {
+ checkWriteable();
+ }
+
+ public void clearBody()
+ {
+ checkWriteable();
+ }
+
+ public AMQMessage getAMQMessage()
+ {
+ return _message;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 8b6db5b53f..69b03c9cd2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -25,6 +25,8 @@ import org.apache.qpid.framing.*;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.txn.TxnBuffer;
+import org.apache.qpid.server.message.MessageDecorator;
+import org.apache.qpid.server.message.jms.JMSMessage;
import org.apache.qpid.AMQException;
import java.util.ArrayList;
@@ -33,17 +35,20 @@ import java.util.LinkedList;
import java.util.Set;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentHashMap;
/**
* Combines the information that make up a deliverable message into a more manageable form.
*/
public class AMQMessage
{
+ public static final String JMS_MESSAGE = "jms.message";
+
private final Set<Object> _tokens = new HashSet<Object>();
private AMQProtocolSession _publisher;
- private final BasicPublishBody _publishBody;
+ private final BasicPublishBody _publishBody;
private ContentHeaderBody _contentHeaderBody;
@@ -83,6 +88,7 @@ public class AMQMessage
* messages published with the 'immediate' flag.
*/
private boolean _deliveredToConsumer;
+ private ConcurrentHashMap<String, MessageDecorator> _decodedMessages;
public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody)
@@ -96,17 +102,19 @@ public class AMQMessage
_publishBody = publishBody;
_store = messageStore;
_contentBodies = new LinkedList<ContentBody>();
+ _decodedMessages = new ConcurrentHashMap<String, MessageDecorator>();
_storeWhenComplete = storeWhenComplete;
}
public AMQMessage(MessageStore store, long messageId, BasicPublishBody publishBody,
ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies)
throws AMQException
-
+
{
_publishBody = publishBody;
_contentHeaderBody = contentHeaderBody;
_contentBodies = contentBodies;
+ _decodedMessages = new ConcurrentHashMap<String, MessageDecorator>();
_messageId = messageId;
_store = store;
storeMessage();
@@ -116,7 +124,7 @@ public class AMQMessage
ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies)
throws AMQException
{
- this(store, store.getNewMessageId(), publishBody, contentHeaderBody, contentBodies);
+ this(store, store.getNewMessageId(), publishBody, contentHeaderBody, contentBodies);
}
protected AMQMessage(AMQMessage msg) throws AMQException
@@ -270,7 +278,7 @@ public class AMQMessage
{
_store.removeMessage(_messageId);
}
- catch(AMQException e)
+ catch (AMQException e)
{
//to maintain consistency, we revert the count
incrementReference();
@@ -291,7 +299,7 @@ public class AMQMessage
public boolean checkToken(Object token)
{
- if(_tokens.contains(token))
+ if (_tokens.contains(token))
{
return true;
}
@@ -307,7 +315,7 @@ public class AMQMessage
//if the message is not persistent or the queue is not durable
//we will not need to recover the association and so do not
//need to record it
- if(isPersistent() && queue.isDurable())
+ if (isPersistent() && queue.isDurable())
{
_store.enqueueMessage(queue.getName(), _messageId);
}
@@ -317,7 +325,7 @@ public class AMQMessage
{
//only record associations where both queue and message will survive
//a restart, so only need to remove association if this is the case
- if(isPersistent() && queue.isDurable())
+ if (isPersistent() && queue.isDurable())
{
_store.dequeueMessage(queue.getName(), _messageId);
}
@@ -325,14 +333,14 @@ public class AMQMessage
public boolean isPersistent() throws AMQException
{
- if(_contentHeaderBody == null)
+ if (_contentHeaderBody == null)
{
throw new AMQException("Cannot determine delivery mode of message. Content header not found.");
}
//todo remove literal values to a constant file such as AMQConstants in common
return _contentHeaderBody.properties instanceof BasicContentHeaderProperties
- &&((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2;
+ && ((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2;
}
public void setTxnBuffer(TxnBuffer buffer)
@@ -346,13 +354,15 @@ public class AMQMessage
}
/**
- * Called to enforce the 'immediate' flag.
+ * Called to enforce the 'immediate' flag.
+ *
* @throws NoConsumersException if the message is marked for
- * immediate delivery but has not been marked as delivered to a
- * consumer
+ * immediate delivery but has not been marked as delivered to a
+ * consumer
*/
- public void checkDeliveredToConsumer() throws NoConsumersException{
- if(isImmediate() && !_deliveredToConsumer)
+ public void checkDeliveredToConsumer() throws NoConsumersException
+ {
+ if (isImmediate() && !_deliveredToConsumer)
{
throw new NoConsumersException(_publishBody, _contentHeaderBody, _contentBodies);
}
@@ -362,7 +372,43 @@ public class AMQMessage
* Called when this message is delivered to a consumer. (used to
* implement the 'immediate' flag functionality).
*/
- public void setDeliveredToConsumer(){
+ public void setDeliveredToConsumer()
+ {
_deliveredToConsumer = true;
}
+
+
+ public MessageDecorator getDecodedMessage(String type)
+ {
+ MessageDecorator msgtype = null;
+
+ if (_decodedMessages != null)
+ {
+ msgtype = _decodedMessages.get(type);
+
+ if (msgtype == null)
+ {
+ msgtype = decorateMessage(type);
+ }
+ }
+
+ return msgtype;
+ }
+
+ private MessageDecorator decorateMessage(String type)
+ {
+ MessageDecorator msgdec = null;
+
+ if (type.equals(JMS_MESSAGE))
+ {
+ msgdec = new JMSMessage(this);
+ }
+
+ if (msgdec != null)
+ {
+ _decodedMessages.put(type, msgdec);
+ }
+
+ return msgdec;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 353a2007c0..f734edcc7b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -26,6 +26,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.MBeanConstructor;
@@ -141,8 +142,8 @@ public class AMQQueue implements Managable, Comparable
// OpenMBean data types for viewMessageContent method
private CompositeType _msgContentType = null;
- private String[] _msgContentAttributes = {"Message Id", "MimeType", "Encoding", "Content"};
- private OpenType[] _msgContentAttributeTypes = new OpenType[4];
+ private String[] _msgContentAttributes = {"Message Id", "MimeType", "Encoding", "Content"};
+ private OpenType[] _msgContentAttributeTypes = new OpenType[4];
@MBeanConstructor("Creates an MBean exposing an AMQQueue")
public AMQQueueMBean() throws JMException
@@ -162,14 +163,14 @@ public class AMQQueue implements Managable, Comparable
_msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding
_msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content
_msgContentType = new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes,
- _msgContentAttributes, _msgContentAttributeTypes);
+ _msgContentAttributes, _msgContentAttributeTypes);
_msgAttributeTypes[0] = SimpleType.LONG; // For message id
_msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes
_msgAttributeTypes[2] = SimpleType.LONG; // For size
_msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered
- _messageDataType = new CompositeType("Message","AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes);
+ _messageDataType = new CompositeType("Message", "AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes);
_messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType, _msgAttributeIndex);
}
@@ -265,7 +266,7 @@ public class AMQQueue implements Managable, Comparable
{
queueDepth = queueDepth + getMessageSize(message);
}
- return (long)Math.round(queueDepth / 1000);
+ return (long) Math.round(queueDepth / 1000);
}
/**
@@ -314,7 +315,7 @@ public class AMQQueue implements Managable, Comparable
private void notifyClients(String notificationMsg)
{
Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
- ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
+ ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
_broadcaster.sendNotification(n);
}
@@ -361,7 +362,7 @@ public class AMQQueue implements Managable, Comparable
if (msg == null)
{
- throw new JMException("AMQMessage with message id = " + msgId + " is not in the " + _queueName );
+ throw new JMException("AMQMessage with message id = " + msgId + " is not in the " + _queueName);
}
// get message content
List<ContentBody> cBodies = msg.getContentBodies();
@@ -379,7 +380,7 @@ public class AMQQueue implements Managable, Comparable
}
// Create header attributes list
- BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties)msg.getContentHeaderBody().properties;
+ BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) msg.getContentHeaderBody().properties;
String mimeType = headerProperties.getContentType();
String encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding();
Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])};
@@ -402,12 +403,12 @@ public class AMQQueue implements Managable, Comparable
TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType);
// Create the tabular list of message header contents
- for (int i = beginIndex; i <= endIndex && i <= list.size(); i++)
+ for (int i = beginIndex; i <= endIndex && i <= list.size(); i++)
{
AMQMessage msg = list.get(i - 1);
- ContentHeaderBody headerBody = msg.getContentHeaderBody();
+ ContentHeaderBody headerBody = msg.getContentHeaderBody();
// Create header attributes list
- BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties)headerBody.properties;
+ BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties;
List<String> headerAttribsList = new ArrayList<String>();
headerAttribsList.add("App Id=" + headerProperties.getAppId());
headerAttribsList.add("MimeType=" + headerProperties.getContentType());
@@ -430,7 +431,7 @@ public class AMQQueue implements Managable, Comparable
@Override
public MBeanNotificationInfo[] getNotificationInfo()
{
- String[] notificationTypes = new String[] {MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
+ String[] notificationTypes = new String[]{MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
String name = MonitorNotification.class.getName();
String description = "Either Message count or Queue depth or Message size has reached threshold high value";
MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
@@ -581,12 +582,12 @@ public class AMQQueue implements Managable, Comparable
_bindings.addBinding(routingKey, exchange);
}
- public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks)
+ public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters)
throws AMQException
{
debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this);
- Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks);
+ Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters);
_subscribers.addSubscriber(subscription);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
index dfc16a7c71..35ac9a13ac 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.filter.MessageFilter;
public interface Subscription
{
@@ -29,4 +30,8 @@ public interface Subscription
boolean isSuspended();
void queueDeleted(AMQQueue queue);
+
+ boolean hasFilters();
+
+ boolean hasInterest(AMQMessage msg);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
index 0fd44e4fbc..f464384562 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.FieldTable;
/**
* Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This
@@ -32,6 +33,9 @@ import org.apache.qpid.AMQException;
*/
public interface SubscriptionFactory
{
+ Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters)
+ throws AMQException;
+
Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks)
throws AMQException;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index 5cad28b80d..a0d86deb19 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -26,7 +26,10 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.BasicDeliverBody;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.protocol.AMQProtocolSession;
/**
@@ -52,19 +55,25 @@ public class SubscriptionImpl implements Subscription
* True if messages need to be acknowledged
*/
private final boolean _acks;
+ private FilterManager _filters;
public static class Factory implements SubscriptionFactory
{
+ public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters) throws AMQException
+ {
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters);
+ }
+
public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks)
throws AMQException
{
- return new SubscriptionImpl(channel, protocolSession, consumerTag, acks);
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, null);
}
public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag)
throws AMQException
{
- return new SubscriptionImpl(channel, protocolSession, consumerTag);
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null);
}
}
@@ -72,6 +81,13 @@ public class SubscriptionImpl implements Subscription
String consumerTag, boolean acks)
throws AMQException
{
+ this(channelId, protocolSession, consumerTag, acks, null);
+ }
+
+ public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession,
+ String consumerTag, boolean acks, FieldTable filters)
+ throws AMQException
+ {
AMQChannel channel = protocolSession.getChannel(channelId);
if (channel == null)
{
@@ -83,14 +99,9 @@ public class SubscriptionImpl implements Subscription
this.consumerTag = consumerTag;
sessionKey = protocolSession.getKey();
_acks = acks;
+ _filters = FilterManagerFactory.createManager(filters);
}
- public SubscriptionImpl(int channel, AMQProtocolSession protocolSession,
- String consumerTag)
- throws AMQException
- {
- this(channel, protocolSession, consumerTag, false);
- }
public boolean equals(Object o)
{
@@ -131,7 +142,7 @@ public class SubscriptionImpl implements Subscription
{
// if we do not need to wait for client acknowledgements
// we can decrement the reference count immediately.
-
+
// By doing this _before_ the send we ensure that it
// doesn't get sent if it can't be dequeued, preventing
// duplicate delivery on recovery.
@@ -178,6 +189,16 @@ public class SubscriptionImpl implements Subscription
channel.queueDeleted(queue);
}
+ public boolean hasFilters()
+ {
+ return _filters != null;
+ }
+
+ public boolean hasInterest(AMQMessage msg)
+ {
+ return _filters.allAllow(msg);
+ }
+
private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
{
AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(), consumerTag,
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
index 7cc3f5f719..a8f778244e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,6 +21,8 @@
package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -58,6 +60,7 @@ class SubscriptionSet implements WeightedSubscriptionManager
/**
* Remove the subscription, returning it if it was found
+ *
* @param subscription
* @return null if no match was found
*/
@@ -90,7 +93,7 @@ class SubscriptionSet implements WeightedSubscriptionManager
/**
* Return the next unsuspended subscription or null if not found.
- *
+ * <p/>
* Performance note:
* This method can scan all items twice when looking for a subscription that is not
* suspended. The worst case occcurs when all subscriptions are suspended. However, it is does this
@@ -105,29 +108,59 @@ class SubscriptionSet implements WeightedSubscriptionManager
return null;
}
- try {
- final Subscription result = nextSubscriber();
- if (result == null) {
+ try
+ {
+ final Subscription result = nextSubscriberImpl(msg);
+ if (result == null)
+ {
_currentSubscriber = 0;
- return nextSubscriber();
- } else {
+ return nextSubscriberImpl(msg);
+ }
+ else
+ {
return result;
}
- } catch (IndexOutOfBoundsException e) {
+ }
+ catch (IndexOutOfBoundsException e)
+ {
_currentSubscriber = 0;
- return nextSubscriber();
+ return nextSubscriber(msg);
}
}
- private Subscription nextSubscriber()
+ private Subscription nextSubscriberImpl(AMQMessage msg)
{
final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
- while (iterator.hasNext()) {
+ while (iterator.hasNext())
+ {
Subscription subscription = iterator.next();
++_currentSubscriber;
subscriberScanned();
- if (!subscription.isSuspended()) {
- return subscription;
+
+ if (!subscription.isSuspended())
+ {
+
+ if ((!subscription.hasFilters()) || (subscription.hasFilters() && subscription.hasInterest(msg)))
+ {
+ return subscription;
+ }
+ // 2006-12-04 : It is fairer to simply skip the person who isn't interested.
+ // Although it does need to be looked at again.
+
+// else
+// {
+// //Don't take penalise a subscriber for not wanting this message.
+// // This would introduce unfairness sticking with the current subscriber
+// // will allow the next message to match.. although could lead to unfairness if:
+// // subscribers: a(bin) b(text) c(text)
+// // msgs : 1(text) 2(text) 3(bin)
+// // subscriber c won't get any messages. as the first two text msgs will go to b and then a will get
+// // the bin msg.
+// // Never said this was fair round-robin-ing.
+// //FIXME - Make a fair round robin.
+//
+// --_currentSubscriber;
+// }
}
}
return null;
@@ -149,7 +182,10 @@ class SubscriptionSet implements WeightedSubscriptionManager
{
for (Subscription s : _subscriptions)
{
- if (!s.isSuspended()) return true;
+ if (!s.isSuspended())
+ {
+ return true;
+ }
}
return false;
}
@@ -159,7 +195,10 @@ class SubscriptionSet implements WeightedSubscriptionManager
int count = 0;
for (Subscription s : _subscriptions)
{
- if (!s.isSuspended()) count++;
+ if (!s.isSuspended())
+ {
+ count++;
+ }
}
return count;
}
@@ -167,9 +206,10 @@ class SubscriptionSet implements WeightedSubscriptionManager
/**
* Notification that a queue has been deleted. This is called so that the subscription can inform the
* channel, which in turn can update its list of unacknowledged messages.
+ *
* @param queue
*/
- public void queueDeleted(AMQQueue queue)
+ public void queueDeleted(AMQQueue queue) throws AMQException
{
for (Subscription s : _subscriptions)
{
@@ -177,7 +217,8 @@ class SubscriptionSet implements WeightedSubscriptionManager
}
}
- int size() {
+ int size()
+ {
return _subscriptions.size();
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index bfd294f09e..885f7647bc 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -829,14 +829,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
AMQDestination amqd = (AMQDestination) destination;
final AMQProtocolHandler protocolHandler = _connection.getProtocolHandler();
- // TODO: construct the rawSelector from the selector string if rawSelector == null
+
final FieldTable ft = FieldTableFactory.newFieldTable();
- //if (rawSelector != null)
- // ft.put("headers", rawSelector.getDataAsBytes());
+
+ // Add headers for headers exchange
if (rawSelector != null)
{
ft.putAll(rawSelector);
}
+
+
BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, _connection, amqd, selector, noLocal,
_messageFactoryRegistry, AMQSession.this,
protocolHandler, ft, prefetchHigh, prefetchLow, exclusive,
@@ -915,6 +917,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
protocolHandler.writeFrame(queueBind);
}
+// /**
+// * Register to consume from the queue.
+// *
+// * @param queueName
+// * @return the consumer tag generated by the broker
+// */
+// private String consumeFromQueue(String queueName, AMQProtocolHandler protocolHandler, int prefetchHigh, int prefetchLow,
+// boolean noLocal, boolean exclusive, int acknowledgeMode) throws AMQException
+// {
+// return consumeFromQueue(queueName, protocolHandler, prefetchHigh, prefetchLow, noLocal, exclusive, acknowledgeMode, null);
+// }
+
/**
* Register to consume from the queue.
*
@@ -922,16 +936,25 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* @return the consumer tag generated by the broker
*/
private String consumeFromQueue(String queueName, AMQProtocolHandler protocolHandler, int prefetchHigh, int prefetchLow,
- boolean noLocal, boolean exclusive, int acknowledgeMode) throws AMQException
+ boolean noLocal, boolean exclusive, int acknowledgeMode, String messageSelector) throws AMQException
{
//fixme prefetch values are not used here. Do we need to have them as parametsrs?
//need to generate a consumer tag on the client so we can exploit the nowait flag
String tag = Integer.toString(_nextTag++);
+ FieldTable ft = new FieldTable();
+
+ if (messageSelector != null)
+ {
+ //fixme move literal value to a common class.
+ ft.put("x-filter-jms-selector", messageSelector);
+ }
+
AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0,
queueName, tag, noLocal,
acknowledgeMode == Session.NO_ACKNOWLEDGE,
- exclusive, true);
+ exclusive, true, ft);
+
protocolHandler.writeFrame(jmsConsume);
return tag;
@@ -1218,11 +1241,22 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
- String consumerTag = consumeFromQueue(queueName, protocolHandler, consumer.getPrefetchHigh(), consumer.getPrefetchLow(),
- consumer.isNoLocal(), consumer.isExclusive(), consumer.getAcknowledgeMode());
+ String consumerTag = null;
+ try
+ {
+ consumerTag = consumeFromQueue(queueName, protocolHandler, consumer.getPrefetchHigh(), consumer.getPrefetchLow(),
+ consumer.isNoLocal(), consumer.isExclusive(), consumer.getAcknowledgeMode(),
+ consumer.getMessageSelector());
+
+ consumer.setConsumerTag(consumerTag);
+ _consumers.put(consumerTag, consumer);
+ }
+ catch (JMSException e)
+ {
+ // getMessageSelector throws JMSEx but it is simply a string return so won't happen.
+ }
+
- consumer.setConsumerTag(consumerTag);
- _consumers.put(consumerTag, consumer);
}
/**
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index fbb55ae289..aaf0320afb 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -58,7 +58,8 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms
{
_data.acquire();
}
- _readableProperties = false;
+ // ContentHeaderProperties are just created and so are empty
+ //_readableProperties = (_contentHeaderProperties != null);
_readableMessage = (data != null);
}
@@ -424,7 +425,15 @@ public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms
buf.append("\nJMS priority: ").append(getJMSPriority());
buf.append("\nJMS delivery mode: ").append(getJMSDeliveryMode());
buf.append("\nJMS reply to: ").append(String.valueOf(getJMSReplyTo()));
+ buf.append("\nJMS Type: ").append(String.valueOf(getJMSType()));
+ buf.append("\nJMS CorrelationID: ").append(String.valueOf(getJMSCorrelationID()));
+ buf.append("\nJMS Destination: NOT IMPLEMENTED");//.append(String.valueOf(getJMSDestination()));
+ buf.append("\nJMS MessageID: ").append(String.valueOf(getJMSMessageID()));
+ buf.append("\nJMS Redelivered: ").append(String.valueOf(getJMSRedelivered()));
+ buf.append("\nProperty Names: ").append(String.valueOf(getPropertyNames()));
+
buf.append("\nAMQ message number: ").append(_deliveryTag);
+
buf.append("\nProperties:");
if (getJmsContentHeaderProperties().getHeaders().isEmpty())
{
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableInTextMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableInTextMessageTest.java
new file mode 100644
index 0000000000..bccf5b4ccd
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableInTextMessageTest.java
@@ -0,0 +1,160 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.basic;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.message.JMSTextMessage;
+import org.apache.qpid.test.VMBrokerSetup;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import javax.jms.*;
+
+import junit.framework.TestCase;
+import junit.framework.Assert;
+
+public class FieldTableInTextMessageTest extends TestCase implements MessageListener
+{
+ private final static Logger _logger = org.apache.log4j.Logger.getLogger(FieldTableInTextMessageTest.class);
+
+ private AMQConnection _connection;
+ private Destination _destination;
+ private AMQSession _session;
+ private Message original_message = null;
+ private Message received_message = null;
+ public String _connectionString = "vm://:1";
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ try
+ {
+ init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path"));
+ }
+ catch (Exception e)
+ {
+ fail("Unable to initialilse connection: " + e);
+ }
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ private void init(AMQConnection connection) throws Exception
+ {
+ Destination destination = new AMQQueue(randomize("TextMessageTest"), true);
+ init(connection, destination);
+ }
+
+ private void init(AMQConnection connection, Destination destination) throws Exception
+ {
+ _connection = connection;
+ _destination = destination;
+ _session = (AMQSession) connection.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+
+ //set up a slow consumer
+ _session.createConsumer(destination).setMessageListener(this);
+ connection.start();
+ }
+
+ public void test() throws Exception
+ {
+ send();
+ waitFor();
+ check();
+ System.out.println("Completed without failure");
+ _connection.close();
+ }
+
+ void send() throws JMSException
+ {
+ //create a publisher
+ MessageProducer producer = _session.createProducer(_destination);
+ Message message = _session.createTextMessage("Message Body");
+ message.setBooleanProperty("boolan", true);
+ message.setByteProperty("byte", Byte.MAX_VALUE);
+ message.setDoubleProperty("double", Double.MAX_VALUE);
+ message.setFloatProperty("float", Float.MAX_VALUE);
+ message.setIntProperty("int", Integer.MAX_VALUE);
+ message.setLongProperty("long", Long.MAX_VALUE);
+ message.setShortProperty("short", Short.MAX_VALUE);
+ message.setStringProperty("String", "String");
+
+
+ original_message = message;
+ _logger.info("Sending Message:" + message);
+ producer.send(message);
+
+ }
+
+ void waitFor() throws InterruptedException
+ {
+ synchronized(received_message)
+ {
+ received_message.wait();
+ }
+ }
+
+ void check() throws JMSException
+ {
+ _logger.info("Received Message:" + received_message);
+ assertEqual(original_message, received_message);
+ }
+
+ private static void assertEqual(Message expected, Message actual)
+ {
+ _logger.info("Expected:" + expected);
+ _logger.info("Actual:" + actual);
+ }
+
+ public void onMessage(Message message)
+ {
+ synchronized(received_message)
+ {
+ received_message = message;
+ received_message.notify();
+ }
+ }
+
+ private static String randomize(String in)
+ {
+ return in + System.currentTimeMillis();
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ FieldTableInTextMessageTest test = new FieldTableInTextMessageTest();
+ test._connectionString = argv.length == 0 ? "vm://:1" : argv[0];
+ test.setUp();
+ test.test();
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new VMBrokerSetup(new junit.framework.TestSuite(FieldTableInTextMessageTest.class));
+ }
+}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
new file mode 100644
index 0000000000..28b0a1c2f7
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
@@ -0,0 +1,140 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.basic;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.BasicMessageProducer;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.test.VMBrokerSetup;
+import org.apache.log4j.Logger;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.DeliveryMode;
+
+import junit.framework.TestCase;
+
+public class SelectorTest extends TestCase implements MessageListener
+{
+
+ private final static Logger _logger = org.apache.log4j.Logger.getLogger(SelectorTest.class);
+
+ private AMQConnection _connection;
+ private AMQDestination _destination;
+ private AMQSession _session;
+ private int count;
+ public String _connectionString = "vm://:1";
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+ init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path"));
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ private void init(AMQConnection connection) throws Exception
+ {
+ init(connection, new AMQQueue(randomize("SessionStartTest"), true));
+ }
+
+ private void init(AMQConnection connection, AMQDestination destination) throws Exception
+ {
+ _connection = connection;
+ _destination = destination;
+ connection.start();
+
+
+ String selector= null;
+// selector = "Cost = 2 AND JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT;
+// selector = "JMSType = Special AND Cost = 2 AND AMQMessageID > 0 AND JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT;
+
+ _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ //_session.createConsumer(destination).setMessageListener(this);
+ _session.createConsumer(destination, selector).setMessageListener(this);
+ }
+
+ public synchronized void test() throws JMSException, InterruptedException
+ {
+ try
+ {
+ Message msg = _session.createTextMessage("Message");
+ msg.setJMSPriority(1);
+ msg.setIntProperty("Cost", 2);
+ msg.setJMSType("Special");
+
+ _logger.info("Sending Message:" + msg);
+
+ ((BasicMessageProducer) _session.createProducer(_destination)).send(msg, DeliveryMode.NON_PERSISTENT);
+ System.out.println("Message sent, waiting for response...");
+ wait(1000);
+
+ if (count > 0)
+ {
+ _logger.info("Got message");
+ }
+
+ if (count == 0)
+ {
+ fail("Did not get message!");
+ //throw new RuntimeException("Did not get message!");
+ }
+ }
+ finally
+ {
+ _session.close();
+ _connection.close();
+ }
+ }
+
+ public synchronized void onMessage(Message message)
+ {
+ count++;
+ _logger.info("Got Message:" + message);
+ notify();
+ }
+
+ private static String randomize(String in)
+ {
+ return in + System.currentTimeMillis();
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ SelectorTest test = new SelectorTest();
+ test._connectionString = argv.length == 0 ? "localhost:5672" : argv[0];
+ test.setUp();
+ test.test();
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new VMBrokerSetup(new junit.framework.TestSuite(SelectorTest.class));
+ }
+}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
index 0268ff2171..1394e7e20b 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
@@ -88,9 +88,19 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage
public void queueDeleted(AMQQueue queue)
{
- if(queue instanceof ClusteredQueue)
+ if (queue instanceof ClusteredQueue)
{
((ClusteredQueue) queue).removeAllRemoteSubscriber(_peer);
}
}
+
+ public boolean hasFilters()
+ {
+ return false;
+ }
+
+ public boolean hasInterest(AMQMessage msg)
+ {
+ return true;
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/url/URLHelper.java b/java/common/src/main/java/org/apache/qpid/url/URLHelper.java
index 2121346c02..f8c621e413 100644
--- a/java/common/src/main/java/org/apache/qpid/url/URLHelper.java
+++ b/java/common/src/main/java/org/apache/qpid/url/URLHelper.java
@@ -64,9 +64,9 @@ public class URLHelper
if (valueIndex + 1 < options.length())
{
if (options.charAt(valueIndex + 1) == DEFAULT_OPTION_SEPERATOR ||
- options.charAt(valueIndex + 1) == ALTERNATIVE_OPTION_SEPARATOR ||
- options.charAt(valueIndex + 1) == BROKER_SEPARATOR ||
- options.charAt(valueIndex + 1) == '\'')
+ options.charAt(valueIndex + 1) == ALTERNATIVE_OPTION_SEPARATOR ||
+ options.charAt(valueIndex + 1) == BROKER_SEPARATOR ||
+ options.charAt(valueIndex + 1) == '\'')
{
nestedQuotes--;
// System.out.println(
@@ -119,7 +119,7 @@ public class URLHelper
else
{
parseError(sepIndex, "Unterminated option. Possible illegal option separator:'" +
- options.charAt(sepIndex) + "'", options);
+ options.charAt(sepIndex) + "'", options);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/url/URLSyntaxException.java b/java/common/src/main/java/org/apache/qpid/url/URLSyntaxException.java
index 3ff7195794..5cac2505a8 100644
--- a/java/common/src/main/java/org/apache/qpid/url/URLSyntaxException.java
+++ b/java/common/src/main/java/org/apache/qpid/url/URLSyntaxException.java
@@ -62,12 +62,12 @@ public class URLSyntaxException extends URISyntaxException
if (getIndex() > -1)
{
- if (_length != -1)
+ if (_length > 1)
{
sb.append(" between indicies ");
sb.append(getIndex());
sb.append(" and ");
- sb.append(_length);
+ sb.append(getIndex() + _length);
}
else
{
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java b/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java
index de6df4bc03..2de1a0fe69 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java
@@ -70,6 +70,16 @@ public class TestSubscription implements Subscription
{
}
+ public boolean hasFilters()
+ {
+ return false;
+ }
+
+ public boolean hasInterest(AMQMessage msg)
+ {
+ return true;
+ }
+
public int hashCode()
{
return key.hashCode();
diff --git a/specs/amqp-8.0.xml b/specs/amqp-8.0.xml
index 6b3a438f08..6a91263a99 100644
--- a/specs/amqp-8.0.xml
+++ b/specs/amqp-8.0.xml
@@ -2085,6 +2085,13 @@ localised reply text
method it will raise a channel or connection exception.
</doc>
</field>
+
+ <field name="filter" type="table" label="arguments for consuming">
+ <doc>
+ A set of filters for the consume. The syntax and semantics
+ of these filters depends on the providers implementation.
+ </doc>
+ </field>
</method>
<method name = "consume-ok" synchronous = "1" index = "21">
@@ -2446,9 +2453,9 @@ localised reply text
A client MUST NOT use this method as a means of selecting messages
to process. A rejected message MAY be discarded or dead-lettered,
not necessarily passed to another client.
- </doc>
+ </doc>
<chassis name = "server" implement = "MUST" />
-
+
<field name = "delivery tag" domain = "delivery tag" />
<field name = "requeue" type = "bit">
@@ -2490,7 +2497,7 @@ localised reply text
The server MUST set the redelivered flag on all messages that are resent.
</doc>
<doc name="rule">
- The server MUST raise a channel exception if this is called on a
+ The server MUST raise a channel exception if this is called on a
transacted channel.
</doc>
</method>
@@ -2792,7 +2799,7 @@ localised reply text
<response name = "open-ok" />
<chassis name = "server" implement = "MUST" />
<chassis name = "client" implement = "MUST" />
-
+
<field name = "identifier" type = "shortstr">
staging identifier
<doc>
@@ -2829,7 +2836,7 @@ localised reply text
<response name = "stage" />
<chassis name = "server" implement = "MUST" />
<chassis name = "client" implement = "MUST" />
-
+
<field name = "staged size" type = "longlong">
already staged amount
<doc>
@@ -3045,7 +3052,7 @@ localised reply text
</doc>
<chassis name = "server" implement = "MUST" />
<field name = "delivery tag" domain = "delivery tag" />
-
+
<field name = "multiple" type = "bit">
acknowledge multiple messages
<doc>
@@ -3084,7 +3091,7 @@ localised reply text
not necessarily passed to another client.
</doc>
<chassis name = "server" implement = "MUST" />
-
+
<field name = "delivery tag" domain = "delivery tag" />
<field name = "requeue" type = "bit">
@@ -3483,7 +3490,7 @@ localised reply text
<doc>
Specifies the routing key name specified when the message was
published.
- </doc>
+ </doc>
</field>
</method>