summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2006-12-19 10:51:39 +0000
committerMartin Ritchie <ritchiem@apache.org>2006-12-19 10:51:39 +0000
commitd3459b6f6e751e77eecac781e4701a4d15290a43 (patch)
tree668cf7edb2b9aac645914f679ff1faae8578a95c
parent88237af17ad42593cf826a471bd51838318ca586 (diff)
downloadqpid-python-d3459b6f6e751e77eecac781e4701a4d15290a43.tar.gz
QPID-21
Added: SelectorParser.jj - ActiveMQ selector javacc grammar used to generate SelectorParser.java server/filter - Selector Filtering code from ActiveMQ project adjusted to suite our class and package structure. server/message - Decorator classes to allow access to the JMSMessage inside the AMQMessage ConcurrentSelectorDeliveryManager.java - A new DeliveryManager that utilises PreDeliveryQueues to implement selectors AMQInvalidSelectorException.java - thrown on client and broker when the Selector text is invalid. Common: log4j.properties to remove error log4j warnings on Common tests. Modified: broker/pom.xml - to generate SelectorParser.java AMQChannel.java - Addition of argument fieldtable for filter setup. BasicConsumeMethodHandler.java - writing of InvalidSelector channel close exception. AMQMessage.java - Added decorator to get access to the enclosed JMSMessage AMQQueue.java - Enhanced 'deliverymanager' property to allow the selection of the ConcurrentSelectorDeliveryManager. Subscription.java - Enhanced interface to allow a subscription to state an 'interest' in a given message. SubscriptionFactory.java - Added method to allow passing of filter arguments. SubscriptionImpl.java - Implemented new Subscription.java methods. SubscriptionManager.java - Added ability to get a list of current subscribers. SubscriptionSet.java - augmented nextSubscriber to allow the subscriber to exert the new hasInterest feature. SynchronizedDeliveryManager.java - fixed Logging class AMQSession - Added filter extraction from consume call and pass it on to the registration. ChannelCloseMethodHandler.java - Handle the reception and correct raising of the InvalidSelector Exception AbstractJMSMessage.java - Expanded imports BlockingMethodFrameListener.java - added extra info to a debug output line. SocketTransportConnection.java - made output an info not a warn. PropertiesFileInitialContextFactory.java - updated to allow the PROVIDER_URL to specify a property file to read in for the initial values. ClusteredSubscriptionManager.java - Implementation of SubscriptionSet.java NestedSubscriptionManager.java - Implementation of SubscriptionManager.java RemoteSubscriptionImpl.java - Implementation Subscription.java AMQConstant.java - Added '322' "Invalid Selector" SubscriptionTestHelper.java - Implementation of Subscription.java Edited specs/amqp-8.0.xml to add field table to consume method. Thanks to the ActiveMQ project for writing the initial SelectorParser.jj and associated filter Expressions. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@488624 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/pom.xml23
-rw-r--r--java/broker/src/main/grammar/SelectorParser.jj598
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java216
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java100
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java45
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java465
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java170
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java42
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java37
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java81
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java79
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java95
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java30
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java305
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java77
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java263
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java130
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java56
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java99
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java22
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/message/MessageDecorator.java25
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java306
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java88
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java30
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java352
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java65
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java72
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java70
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java23
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java41
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java141
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java7
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java25
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java35
-rw-r--r--java/common/src/main/java/log4j.properties28
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java31
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java13
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java4
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java13
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java21
-rw-r--r--specs/amqp-8.0.xml8
49 files changed, 4263 insertions, 102 deletions
diff --git a/java/broker/pom.xml b/java/broker/pom.xml
index aea2d5878a..5f4c490fd4 100644
--- a/java/broker/pom.xml
+++ b/java/broker/pom.xml
@@ -55,6 +55,10 @@
<artifactId>commons-lang</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-jms_1.1_spec</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-filter-ssl</artifactId>
</dependency>
@@ -83,6 +87,25 @@
<build>
<plugins>
<plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>javacc-maven-plugin</artifactId>
+ <version>2.0</version>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <configuration>
+ <sourceDirectory>${basedir}/src/main/grammar</sourceDirectory>
+ <outputDirectory>${basedir}/target/generated</outputDirectory>
+ <packageName>org.apache.qpid.server.filter.jms.selector</packageName>
+ </configuration>
+ <goals>
+ <goal>javacc</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
diff --git a/java/broker/src/main/grammar/SelectorParser.jj b/java/broker/src/main/grammar/SelectorParser.jj
new file mode 100644
index 0000000000..5553a46e47
--- /dev/null
+++ b/java/broker/src/main/grammar/SelectorParser.jj
@@ -0,0 +1,598 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+ //
+ // Original File from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+ //
+
+// ----------------------------------------------------------------------------
+// OPTIONS
+// ----------------------------------------------------------------------------
+options {
+ STATIC = false;
+ UNICODE_INPUT = true;
+
+ // some performance optimizations
+ OPTIMIZE_TOKEN_MANAGER = true;
+ ERROR_REPORTING = false;
+}
+
+// ----------------------------------------------------------------------------
+// PARSER
+// ----------------------------------------------------------------------------
+
+PARSER_BEGIN(SelectorParser)
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.filter.jms.selector;
+
+import java.io.*;
+import java.util.*;
+
+import javax.jms.InvalidSelectorException;
+
+import org.apache.qpid.server.filter.*;
+
+/**
+ * JMS Selector Parser generated by JavaCC
+ *
+ * Do not edit this .java file directly - it is autogenerated from SelectorParser.jj
+ */
+public class SelectorParser {
+
+ public SelectorParser() {
+ this(new StringReader(""));
+ }
+
+ public BooleanExpression parse(String sql) throws InvalidSelectorException {
+ this.ReInit(new StringReader(sql));
+
+ try {
+ return this.JmsSelector();
+ }
+ catch (Throwable e) {
+ throw (InvalidSelectorException)new InvalidSelectorException(sql).initCause(e);
+ }
+
+ }
+
+ private BooleanExpression asBooleanExpression(Expression value) throws ParseException {
+ if (value instanceof BooleanExpression) {
+ return (BooleanExpression) value;
+ }
+ if (value instanceof PropertyExpression) {
+ return UnaryExpression.createBooleanCast( value );
+ }
+ throw new ParseException("Expression will not result in a boolean value: " + value);
+ }
+
+
+}
+
+PARSER_END(SelectorParser)
+
+// ----------------------------------------------------------------------------
+// Tokens
+// ----------------------------------------------------------------------------
+
+/* White Space */
+SPECIAL_TOKEN :
+{
+ " " | "\t" | "\n" | "\r" | "\f"
+}
+
+/* Comments */
+SKIP:
+{
+ <LINE_COMMENT: "--" (~["\n","\r"])* ("\n"|"\r"|"\r\n") >
+}
+
+SKIP:
+{
+ <BLOCK_COMMENT: "/*" (~["*"])* "*" ("*" | (~["*","/"] (~["*"])* "*"))* "/">
+}
+
+/* Reserved Words */
+TOKEN [IGNORE_CASE] :
+{
+ < NOT : "NOT">
+ | < AND : "AND">
+ | < OR : "OR">
+ | < BETWEEN : "BETWEEN">
+ | < LIKE : "LIKE">
+ | < ESCAPE : "ESCAPE">
+ | < IN : "IN">
+ | < IS : "IS">
+ | < TRUE : "TRUE" >
+ | < FALSE : "FALSE" >
+ | < NULL : "NULL" >
+ | < XPATH : "XPATH" >
+ | < XQUERY : "XQUERY" >
+}
+
+/* Literals */
+TOKEN [IGNORE_CASE] :
+{
+
+ < DECIMAL_LITERAL: ["1"-"9"] (["0"-"9"])* (["l","L"])? >
+ | < HEX_LITERAL: "0" ["x","X"] (["0"-"9","a"-"f","A"-"F"])+ >
+ | < OCTAL_LITERAL: "0" (["0"-"7"])* >
+ | < FLOATING_POINT_LITERAL:
+ (["0"-"9"])+ "." (["0"-"9"])* (<EXPONENT>)? // matches: 5.5 or 5. or 5.5E10 or 5.E10
+ | "." (["0"-"9"])+ (<EXPONENT>)? // matches: .5 or .5E10
+ | (["0"-"9"])+ <EXPONENT> // matches: 5E10
+ >
+ | < #EXPONENT: "E" (["+","-"])? (["0"-"9"])+ >
+ | < STRING_LITERAL: "'" ( ("''") | ~["'"] )* "'" >
+}
+
+TOKEN [IGNORE_CASE] :
+{
+ < ID : ["a"-"z", "_", "$"] (["a"-"z","0"-"9","_", "$"])* >
+}
+
+// ----------------------------------------------------------------------------
+// Grammer
+// ----------------------------------------------------------------------------
+BooleanExpression JmsSelector() :
+{
+ Expression left=null;
+}
+{
+ (
+ left = orExpression()
+ )
+ {
+ return asBooleanExpression(left);
+ }
+
+}
+
+Expression orExpression() :
+{
+ Expression left;
+ Expression right;
+}
+{
+ (
+ left = andExpression()
+ (
+ <OR> right = andExpression()
+ {
+ left = LogicExpression.createOR(asBooleanExpression(left), asBooleanExpression(right));
+ }
+ )*
+ )
+ {
+ return left;
+ }
+
+}
+
+
+Expression andExpression() :
+{
+ Expression left;
+ Expression right;
+}
+{
+ (
+ left = equalityExpression()
+ (
+ <AND> right = equalityExpression()
+ {
+ left = LogicExpression.createAND(asBooleanExpression(left), asBooleanExpression(right));
+ }
+ )*
+ )
+ {
+ return left;
+ }
+}
+
+Expression equalityExpression() :
+{
+ Expression left;
+ Expression right;
+}
+{
+ (
+ left = comparisonExpression()
+ (
+
+ "=" right = comparisonExpression()
+ {
+ left = ComparisonExpression.createEqual(left, right);
+ }
+ |
+ "<>" right = comparisonExpression()
+ {
+ left = ComparisonExpression.createNotEqual(left, right);
+ }
+ |
+ LOOKAHEAD(2)
+ <IS> <NULL>
+ {
+ left = ComparisonExpression.createIsNull(left);
+ }
+ |
+ <IS> <NOT> <NULL>
+ {
+ left = ComparisonExpression.createIsNotNull(left);
+ }
+ )*
+ )
+ {
+ return left;
+ }
+}
+
+Expression comparisonExpression() :
+{
+ Expression left;
+ Expression right;
+ Expression low;
+ Expression high;
+ String t, u;
+ boolean not;
+ ArrayList list;
+}
+{
+ (
+ left = addExpression()
+ (
+
+ ">" right = addExpression()
+ {
+ left = ComparisonExpression.createGreaterThan(left, right);
+ }
+ |
+ ">=" right = addExpression()
+ {
+ left = ComparisonExpression.createGreaterThanEqual(left, right);
+ }
+ |
+ "<" right = addExpression()
+ {
+ left = ComparisonExpression.createLessThan(left, right);
+ }
+ |
+ "<=" right = addExpression()
+ {
+ left = ComparisonExpression.createLessThanEqual(left, right);
+ }
+ |
+ {
+ u=null;
+ }
+ <LIKE> t = stringLitteral()
+ [ <ESCAPE> u = stringLitteral() ]
+ {
+ left = ComparisonExpression.createLike(left, t, u);
+ }
+ |
+ LOOKAHEAD(2)
+ {
+ u=null;
+ }
+ <NOT> <LIKE> t = stringLitteral() [ <ESCAPE> u = stringLitteral() ]
+ {
+ left = ComparisonExpression.createNotLike(left, t, u);
+ }
+ |
+ <BETWEEN> low = addExpression() <AND> high = addExpression()
+ {
+ left = ComparisonExpression.createBetween(left, low, high);
+ }
+ |
+ LOOKAHEAD(2)
+ <NOT> <BETWEEN> low = addExpression() <AND> high = addExpression()
+ {
+ left = ComparisonExpression.createNotBetween(left, low, high);
+ }
+ |
+ <IN>
+ "("
+ t = stringLitteral()
+ {
+ list = new ArrayList();
+ list.add( t );
+ }
+ (
+ ","
+ t = stringLitteral()
+ {
+ list.add( t );
+ }
+
+ )*
+ ")"
+ {
+ left = ComparisonExpression.createInFilter(left, list);
+ }
+ |
+ LOOKAHEAD(2)
+ <NOT> <IN>
+ "("
+ t = stringLitteral()
+ {
+ list = new ArrayList();
+ list.add( t );
+ }
+ (
+ ","
+ t = stringLitteral()
+ {
+ list.add( t );
+ }
+
+ )*
+ ")"
+ {
+ left = ComparisonExpression.createNotInFilter(left, list);
+ }
+
+ )*
+ )
+ {
+ return left;
+ }
+}
+
+Expression addExpression() :
+{
+ Expression left;
+ Expression right;
+}
+{
+ left = multExpr()
+ (
+ LOOKAHEAD( ("+"|"-") multExpr())
+ (
+ "+" right = multExpr()
+ {
+ left = ArithmeticExpression.createPlus(left, right);
+ }
+ |
+ "-" right = multExpr()
+ {
+ left = ArithmeticExpression.createMinus(left, right);
+ }
+ )
+
+ )*
+ {
+ return left;
+ }
+}
+
+Expression multExpr() :
+{
+ Expression left;
+ Expression right;
+}
+{
+ left = unaryExpr()
+ (
+ "*" right = unaryExpr()
+ {
+ left = ArithmeticExpression.createMultiply(left, right);
+ }
+ |
+ "/" right = unaryExpr()
+ {
+ left = ArithmeticExpression.createDivide(left, right);
+ }
+ |
+ "%" right = unaryExpr()
+ {
+ left = ArithmeticExpression.createMod(left, right);
+ }
+
+ )*
+ {
+ return left;
+ }
+}
+
+
+Expression unaryExpr() :
+{
+ String s=null;
+ Expression left=null;
+}
+{
+ (
+ LOOKAHEAD( "+" unaryExpr() )
+ "+" left=unaryExpr()
+ |
+ "-" left=unaryExpr()
+ {
+ left = UnaryExpression.createNegate(left);
+ }
+ |
+ <NOT> left=unaryExpr()
+ {
+ left = UnaryExpression.createNOT( asBooleanExpression(left) );
+ }
+ |
+ <XPATH> s=stringLitteral()
+ {
+ left = UnaryExpression.createXPath( s );
+ }
+ |
+ <XQUERY> s=stringLitteral()
+ {
+ left = UnaryExpression.createXQuery( s );
+ }
+ |
+ left = primaryExpr()
+ )
+ {
+ return left;
+ }
+
+}
+
+Expression primaryExpr() :
+{
+ Expression left=null;
+}
+{
+ (
+ left = literal()
+ |
+ left = variable()
+ |
+ "(" left = orExpression() ")"
+ )
+ {
+ return left;
+ }
+}
+
+
+
+ConstantExpression literal() :
+{
+ Token t;
+ String s;
+ ConstantExpression left=null;
+}
+{
+ (
+ (
+ s = stringLitteral()
+ {
+ left = new ConstantExpression(s);
+ }
+ )
+ |
+ (
+ t = <DECIMAL_LITERAL>
+ {
+ left = ConstantExpression.createFromDecimal(t.image);
+ }
+ )
+ |
+ (
+ t = <HEX_LITERAL>
+ {
+ left = ConstantExpression.createFromHex(t.image);
+ }
+ )
+ |
+ (
+ t = <OCTAL_LITERAL>
+ {
+ left = ConstantExpression.createFromOctal(t.image);
+ }
+ )
+ |
+ (
+ t = <FLOATING_POINT_LITERAL>
+ {
+ left = ConstantExpression.createFloat(t.image);
+ }
+ )
+ |
+ (
+ <TRUE>
+ {
+ left = ConstantExpression.TRUE;
+ }
+ )
+ |
+ (
+ <FALSE>
+ {
+ left = ConstantExpression.FALSE;
+ }
+ )
+ |
+ (
+ <NULL>
+ {
+ left = ConstantExpression.NULL;
+ }
+ )
+ )
+ {
+ return left;
+ }
+}
+
+String stringLitteral() :
+{
+ Token t;
+ StringBuffer rc = new StringBuffer();
+ boolean first=true;
+}
+{
+ t = <STRING_LITERAL>
+ {
+ // Decode the sting value.
+ String image = t.image;
+ for( int i=1; i < image.length()-1; i++ ) {
+ char c = image.charAt(i);
+ if( c == '\'' )
+ i++;
+ rc.append(c);
+ }
+ return rc.toString();
+ }
+}
+
+PropertyExpression variable() :
+{
+ Token t;
+ PropertyExpression left=null;
+}
+{
+ (
+ t = <ID>
+ {
+ left = new PropertyExpression(t.image);
+ }
+ )
+ {
+ return left;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index b0fbafac56..d8485ef0f2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -26,6 +26,7 @@ import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.ack.TxAck;
import org.apache.qpid.server.ack.UnacknowledgedMessage;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
@@ -290,7 +291,7 @@ public class AMQChannel
* @throws ConsumerTagNotUniqueException if the tag is not unique
* @throws AMQException if something goes wrong
*/
- public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks) throws AMQException, ConsumerTagNotUniqueException
+ public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks, FieldTable filters) throws AMQException, ConsumerTagNotUniqueException
{
if (tag == null)
{
@@ -301,7 +302,7 @@ public class AMQChannel
throw new ConsumerTagNotUniqueException();
}
- queue.registerProtocolSession(session, _channelId, tag, acks);
+ queue.registerProtocolSession(session, _channelId, tag, acks, filters);
_consumerTag2QueueMap.put(tag, queue);
return tag;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java
new file mode 100644
index 0000000000..0aa5739c1c
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java
@@ -0,0 +1,216 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+
+import org.apache.qpid.server.queue.AMQMessage;
+
+import javax.jms.JMSException;
+
+/**
+ * An expression which performs an operation on two expression values
+ *
+ * @version $Revision$
+ */
+public abstract class ArithmeticExpression extends BinaryExpression {
+
+ protected static final int INTEGER = 1;
+ protected static final int LONG = 2;
+ protected static final int DOUBLE = 3;
+
+ /**
+ * @param left
+ * @param right
+ */
+ public ArithmeticExpression(Expression left, Expression right) {
+ super(left, right);
+ }
+
+ public static Expression createPlus(Expression left, Expression right) {
+ return new ArithmeticExpression(left, right) {
+ protected Object evaluate(Object lvalue, Object rvalue) {
+ if (lvalue instanceof String) {
+ String text = (String) lvalue;
+ String answer = text + rvalue;
+ return answer;
+ }
+ else if (lvalue instanceof Number) {
+ return plus((Number) lvalue, asNumber(rvalue));
+ }
+ throw new RuntimeException("Cannot call plus operation on: " + lvalue + " and: " + rvalue);
+ }
+
+ public String getExpressionSymbol() {
+ return "+";
+ }
+ };
+ }
+
+ public static Expression createMinus(Expression left, Expression right) {
+ return new ArithmeticExpression(left, right) {
+ protected Object evaluate(Object lvalue, Object rvalue) {
+ if (lvalue instanceof Number) {
+ return minus((Number) lvalue, asNumber(rvalue));
+ }
+ throw new RuntimeException("Cannot call minus operation on: " + lvalue + " and: " + rvalue);
+ }
+
+ public String getExpressionSymbol() {
+ return "-";
+ }
+ };
+ }
+
+ public static Expression createMultiply(Expression left, Expression right) {
+ return new ArithmeticExpression(left, right) {
+
+ protected Object evaluate(Object lvalue, Object rvalue) {
+ if (lvalue instanceof Number) {
+ return multiply((Number) lvalue, asNumber(rvalue));
+ }
+ throw new RuntimeException("Cannot call multiply operation on: " + lvalue + " and: " + rvalue);
+ }
+
+ public String getExpressionSymbol() {
+ return "*";
+ }
+ };
+ }
+
+ public static Expression createDivide(Expression left, Expression right) {
+ return new ArithmeticExpression(left, right) {
+
+ protected Object evaluate(Object lvalue, Object rvalue) {
+ if (lvalue instanceof Number) {
+ return divide((Number) lvalue, asNumber(rvalue));
+ }
+ throw new RuntimeException("Cannot call divide operation on: " + lvalue + " and: " + rvalue);
+ }
+
+ public String getExpressionSymbol() {
+ return "/";
+ }
+ };
+ }
+
+ public static Expression createMod(Expression left, Expression right) {
+ return new ArithmeticExpression(left, right) {
+
+ protected Object evaluate(Object lvalue, Object rvalue) {
+ if (lvalue instanceof Number) {
+ return mod((Number) lvalue, asNumber(rvalue));
+ }
+ throw new RuntimeException("Cannot call mod operation on: " + lvalue + " and: " + rvalue);
+ }
+
+ public String getExpressionSymbol() {
+ return "%";
+ }
+ };
+ }
+
+ protected Number plus(Number left, Number right) {
+ switch (numberType(left, right)) {
+ case INTEGER:
+ return new Integer(left.intValue() + right.intValue());
+ case LONG:
+ return new Long(left.longValue() + right.longValue());
+ default:
+ return new Double(left.doubleValue() + right.doubleValue());
+ }
+ }
+
+ protected Number minus(Number left, Number right) {
+ switch (numberType(left, right)) {
+ case INTEGER:
+ return new Integer(left.intValue() - right.intValue());
+ case LONG:
+ return new Long(left.longValue() - right.longValue());
+ default:
+ return new Double(left.doubleValue() - right.doubleValue());
+ }
+ }
+
+ protected Number multiply(Number left, Number right) {
+ switch (numberType(left, right)) {
+ case INTEGER:
+ return new Integer(left.intValue() * right.intValue());
+ case LONG:
+ return new Long(left.longValue() * right.longValue());
+ default:
+ return new Double(left.doubleValue() * right.doubleValue());
+ }
+ }
+
+ protected Number divide(Number left, Number right) {
+ return new Double(left.doubleValue() / right.doubleValue());
+ }
+
+ protected Number mod(Number left, Number right) {
+ return new Double(left.doubleValue() % right.doubleValue());
+ }
+
+ private int numberType(Number left, Number right) {
+ if (isDouble(left) || isDouble(right)) {
+ return DOUBLE;
+ }
+ else if (left instanceof Long || right instanceof Long) {
+ return LONG;
+ }
+ else {
+ return INTEGER;
+ }
+ }
+
+ private boolean isDouble(Number n) {
+ return n instanceof Float || n instanceof Double;
+ }
+
+ protected Number asNumber(Object value) {
+ if (value instanceof Number) {
+ return (Number) value;
+ }
+ else {
+ throw new RuntimeException("Cannot convert value: " + value + " into a number");
+ }
+ }
+
+ public Object evaluate(AMQMessage message) throws JMSException {
+ Object lvalue = left.evaluate(message);
+ if (lvalue == null) {
+ return null;
+ }
+ Object rvalue = right.evaluate(message);
+ if (rvalue == null) {
+ return null;
+ }
+ return evaluate(lvalue, rvalue);
+ }
+
+
+ /**
+ * @param lvalue
+ * @param rvalue
+ * @return
+ */
+ abstract protected Object evaluate(Object lvalue, Object rvalue);
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java
new file mode 100644
index 0000000000..4256ab9189
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java
@@ -0,0 +1,100 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+
+
+/**
+ * An expression which performs an operation on two expression values.
+ *
+ * @version $Revision$
+ */
+abstract public class BinaryExpression implements Expression {
+ protected Expression left;
+ protected Expression right;
+
+ public BinaryExpression(Expression left, Expression right) {
+ this.left = left;
+ this.right = right;
+ }
+
+ public Expression getLeft() {
+ return left;
+ }
+
+ public Expression getRight() {
+ return right;
+ }
+
+
+ /**
+ * @see java.lang.Object#toString()
+ */
+ public String toString() {
+ return "(" + left.toString() + " " + getExpressionSymbol() + " " + right.toString() + ")";
+ }
+
+ /**
+ * TODO: more efficient hashCode()
+ *
+ * @see java.lang.Object#hashCode()
+ */
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ /**
+ * TODO: more efficient hashCode()
+ *
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ public boolean equals(Object o) {
+
+ if (o == null || !this.getClass().equals(o.getClass())) {
+ return false;
+ }
+ return toString().equals(o.toString());
+
+ }
+
+ /**
+ * Returns the symbol that represents this binary expression. For example, addition is
+ * represented by "+"
+ *
+ * @return
+ */
+ abstract public String getExpressionSymbol();
+
+ /**
+ * @param expression
+ */
+ public void setRight(Expression expression) {
+ right = expression;
+ }
+
+ /**
+ * @param expression
+ */
+ public void setLeft(Expression expression) {
+ left = expression;
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java
new file mode 100644
index 0000000000..b66de3fbc5
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java
@@ -0,0 +1,45 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.jms.JMSMessage;
+
+import javax.jms.JMSException;
+
+
+/**
+ * A BooleanExpression is an expression that always
+ * produces a Boolean result.
+ *
+ * @version $Revision$
+ */
+public interface BooleanExpression extends Expression {
+
+ /**
+ * @param message
+ * @return true if the expression evaluates to Boolean.TRUE.
+ * @throws JMSException
+ */
+ public boolean matches(AMQMessage message) throws JMSException;
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java
new file mode 100644
index 0000000000..13d278cf65
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java
@@ -0,0 +1,465 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.jms.JMSMessage;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import javax.jms.JMSException;
+
+/**
+ * A filter performing a comparison of two objects
+ *
+ * @version $Revision$
+ */
+public abstract class ComparisonExpression extends BinaryExpression implements BooleanExpression {
+
+ public static BooleanExpression createBetween(Expression value, Expression left, Expression right) {
+ return LogicExpression.createAND(createGreaterThanEqual(value, left), createLessThanEqual(value, right));
+ }
+
+ public static BooleanExpression createNotBetween(Expression value, Expression left, Expression right) {
+ return LogicExpression.createOR(createLessThan(value, left), createGreaterThan(value, right));
+ }
+
+ static final private HashSet REGEXP_CONTROL_CHARS = new HashSet();
+
+ static {
+ REGEXP_CONTROL_CHARS.add(new Character('.'));
+ REGEXP_CONTROL_CHARS.add(new Character('\\'));
+ REGEXP_CONTROL_CHARS.add(new Character('['));
+ REGEXP_CONTROL_CHARS.add(new Character(']'));
+ REGEXP_CONTROL_CHARS.add(new Character('^'));
+ REGEXP_CONTROL_CHARS.add(new Character('$'));
+ REGEXP_CONTROL_CHARS.add(new Character('?'));
+ REGEXP_CONTROL_CHARS.add(new Character('*'));
+ REGEXP_CONTROL_CHARS.add(new Character('+'));
+ REGEXP_CONTROL_CHARS.add(new Character('{'));
+ REGEXP_CONTROL_CHARS.add(new Character('}'));
+ REGEXP_CONTROL_CHARS.add(new Character('|'));
+ REGEXP_CONTROL_CHARS.add(new Character('('));
+ REGEXP_CONTROL_CHARS.add(new Character(')'));
+ REGEXP_CONTROL_CHARS.add(new Character(':'));
+ REGEXP_CONTROL_CHARS.add(new Character('&'));
+ REGEXP_CONTROL_CHARS.add(new Character('<'));
+ REGEXP_CONTROL_CHARS.add(new Character('>'));
+ REGEXP_CONTROL_CHARS.add(new Character('='));
+ REGEXP_CONTROL_CHARS.add(new Character('!'));
+ }
+
+ static class LikeExpression extends UnaryExpression implements BooleanExpression {
+
+ Pattern likePattern;
+
+ /**
+ * @param right
+ */
+ public LikeExpression(Expression right, String like, int escape) {
+ super(right);
+
+ StringBuffer regexp = new StringBuffer(like.length() * 2);
+ regexp.append("\\A"); // The beginning of the input
+ for (int i = 0; i < like.length(); i++) {
+ char c = like.charAt(i);
+ if (escape == (0xFFFF & c)) {
+ i++;
+ if (i >= like.length()) {
+ // nothing left to escape...
+ break;
+ }
+
+ char t = like.charAt(i);
+ regexp.append("\\x");
+ regexp.append(Integer.toHexString(0xFFFF & t));
+ }
+ else if (c == '%') {
+ regexp.append(".*?"); // Do a non-greedy match
+ }
+ else if (c == '_') {
+ regexp.append("."); // match one
+ }
+ else if (REGEXP_CONTROL_CHARS.contains(new Character(c))) {
+ regexp.append("\\x");
+ regexp.append(Integer.toHexString(0xFFFF & c));
+ }
+ else {
+ regexp.append(c);
+ }
+ }
+ regexp.append("\\z"); // The end of the input
+
+ likePattern = Pattern.compile(regexp.toString(), Pattern.DOTALL);
+ }
+
+ /**
+ * org.apache.activemq.filter.UnaryExpression#getExpressionSymbol()
+ */
+ public String getExpressionSymbol() {
+ return "LIKE";
+ }
+
+ /**
+ * org.apache.activemq.filter.Expression#evaluate(MessageEvaluationContext)
+ */
+ public Object evaluate(AMQMessage message) throws JMSException {
+
+ Object rv = this.getRight().evaluate(message);
+
+ if (rv == null) {
+ return null;
+ }
+
+ if (!(rv instanceof String)) {
+ return Boolean.FALSE;
+ //throw new RuntimeException("LIKE can only operate on String identifiers. LIKE attemped on: '" + rv.getClass());
+ }
+
+ return likePattern.matcher((String) rv).matches() ? Boolean.TRUE : Boolean.FALSE;
+ }
+
+ public boolean matches(AMQMessage message) throws JMSException {
+ Object object = evaluate(message);
+ return object!=null && object==Boolean.TRUE;
+ }
+ }
+
+ public static BooleanExpression createLike(Expression left, String right, String escape) {
+ if (escape != null && escape.length() != 1) {
+ throw new RuntimeException("The ESCAPE string litteral is invalid. It can only be one character. Litteral used: " + escape);
+ }
+ int c = -1;
+ if (escape != null) {
+ c = 0xFFFF & escape.charAt(0);
+ }
+
+ return new LikeExpression(left, right, c);
+ }
+
+ public static BooleanExpression createNotLike(Expression left, String right, String escape) {
+ return UnaryExpression.createNOT(createLike(left, right, escape));
+ }
+
+ public static BooleanExpression createInFilter(Expression left, List elements) {
+
+ if( !(left instanceof PropertyExpression) )
+ throw new RuntimeException("Expected a property for In expression, got: "+left);
+ return UnaryExpression.createInExpression((PropertyExpression)left, elements, false);
+
+ }
+
+ public static BooleanExpression createNotInFilter(Expression left, List elements) {
+
+ if( !(left instanceof PropertyExpression) )
+ throw new RuntimeException("Expected a property for In expression, got: "+left);
+ return UnaryExpression.createInExpression((PropertyExpression)left, elements, true);
+
+ }
+
+ public static BooleanExpression createIsNull(Expression left) {
+ return doCreateEqual(left, ConstantExpression.NULL);
+ }
+
+ public static BooleanExpression createIsNotNull(Expression left) {
+ return UnaryExpression.createNOT(doCreateEqual(left, ConstantExpression.NULL));
+ }
+
+ public static BooleanExpression createNotEqual(Expression left, Expression right) {
+ return UnaryExpression.createNOT(createEqual(left, right));
+ }
+
+ public static BooleanExpression createEqual(Expression left, Expression right) {
+ checkEqualOperand(left);
+ checkEqualOperand(right);
+ checkEqualOperandCompatability(left, right);
+ return doCreateEqual(left, right);
+ }
+
+ private static BooleanExpression doCreateEqual(Expression left, Expression right) {
+ return new ComparisonExpression(left, right) {
+
+ public Object evaluate(AMQMessage message) throws JMSException {
+ Object lv = left.evaluate(message);
+ Object rv = right.evaluate(message);
+
+ // Iff one of the values is null
+ if (lv == null ^ rv == null) {
+ return Boolean.FALSE;
+ }
+ if (lv == rv || lv.equals(rv)) {
+ return Boolean.TRUE;
+ }
+ if( lv instanceof Comparable && rv instanceof Comparable ) {
+ return compare((Comparable)lv, (Comparable)rv);
+ }
+ return Boolean.FALSE;
+ }
+
+ protected boolean asBoolean(int answer) {
+ return answer == 0;
+ }
+
+ public String getExpressionSymbol() {
+ return "=";
+ }
+ };
+ }
+
+ public static BooleanExpression createGreaterThan(final Expression left, final Expression right) {
+ checkLessThanOperand(left);
+ checkLessThanOperand(right);
+ return new ComparisonExpression(left, right) {
+ protected boolean asBoolean(int answer) {
+ return answer > 0;
+ }
+
+ public String getExpressionSymbol() {
+ return ">";
+ }
+ };
+ }
+
+ public static BooleanExpression createGreaterThanEqual(final Expression left, final Expression right) {
+ checkLessThanOperand(left);
+ checkLessThanOperand(right);
+ return new ComparisonExpression(left, right) {
+ protected boolean asBoolean(int answer) {
+ return answer >= 0;
+ }
+
+ public String getExpressionSymbol() {
+ return ">=";
+ }
+ };
+ }
+
+ public static BooleanExpression createLessThan(final Expression left, final Expression right) {
+ checkLessThanOperand(left);
+ checkLessThanOperand(right);
+ return new ComparisonExpression(left, right) {
+
+ protected boolean asBoolean(int answer) {
+ return answer < 0;
+ }
+
+ public String getExpressionSymbol() {
+ return "<";
+ }
+
+ };
+ }
+
+ public static BooleanExpression createLessThanEqual(final Expression left, final Expression right) {
+ checkLessThanOperand(left);
+ checkLessThanOperand(right);
+ return new ComparisonExpression(left, right) {
+
+ protected boolean asBoolean(int answer) {
+ return answer <= 0;
+ }
+
+ public String getExpressionSymbol() {
+ return "<=";
+ }
+ };
+ }
+
+ /**
+ * Only Numeric expressions can be used in >, >=, < or <= expressions.s
+ *
+ * @param expr
+ */
+ public static void checkLessThanOperand(Expression expr ) {
+ if( expr instanceof ConstantExpression ) {
+ Object value = ((ConstantExpression)expr).getValue();
+ if( value instanceof Number )
+ return;
+
+ // Else it's boolean or a String..
+ throw new RuntimeException("Value '"+expr+"' cannot be compared.");
+ }
+ if( expr instanceof BooleanExpression ) {
+ throw new RuntimeException("Value '"+expr+"' cannot be compared.");
+ }
+ }
+
+ /**
+ * Validates that the expression can be used in == or <> expression.
+ * Cannot not be NULL TRUE or FALSE litterals.
+ *
+ * @param expr
+ */
+ public static void checkEqualOperand(Expression expr ) {
+ if( expr instanceof ConstantExpression ) {
+ Object value = ((ConstantExpression)expr).getValue();
+ if( value == null )
+ throw new RuntimeException("'"+expr+"' cannot be compared.");
+ }
+ }
+
+ /**
+ *
+ * @param left
+ * @param right
+ */
+ private static void checkEqualOperandCompatability(Expression left, Expression right) {
+ if( left instanceof ConstantExpression && right instanceof ConstantExpression ) {
+ if( left instanceof BooleanExpression && !(right instanceof BooleanExpression) )
+ throw new RuntimeException("'"+left+"' cannot be compared with '"+right+"'");
+ }
+ }
+
+
+
+ /**
+ * @param left
+ * @param right
+ */
+ public ComparisonExpression(Expression left, Expression right) {
+ super(left, right);
+ }
+
+ public Object evaluate(AMQMessage message) throws JMSException {
+ Comparable lv = (Comparable) left.evaluate(message);
+ if (lv == null) {
+ return null;
+ }
+ Comparable rv = (Comparable) right.evaluate(message);
+ if (rv == null) {
+ return null;
+ }
+ return compare(lv, rv);
+ }
+
+ protected Boolean compare(Comparable lv, Comparable rv) {
+ Class lc = lv.getClass();
+ Class rc = rv.getClass();
+ // If the the objects are not of the same type,
+ // try to convert up to allow the comparison.
+ if (lc != rc) {
+ if (lc == Byte.class) {
+ if (rc == Short.class) {
+ lv = new Short(((Number) lv).shortValue());
+ }
+ else if (rc == Integer.class) {
+ lv = new Integer(((Number) lv).intValue());
+ }
+ else if (rc == Long.class) {
+ lv = new Long(((Number) lv).longValue());
+ }
+ else if (rc == Float.class) {
+ lv = new Float(((Number) lv).floatValue());
+ }
+ else if (rc == Double.class) {
+ lv = new Double(((Number) lv).doubleValue());
+ }
+ else {
+ return Boolean.FALSE;
+ }
+ } else if (lc == Short.class) {
+ if (rc == Integer.class) {
+ lv = new Integer(((Number) lv).intValue());
+ }
+ else if (rc == Long.class) {
+ lv = new Long(((Number) lv).longValue());
+ }
+ else if (rc == Float.class) {
+ lv = new Float(((Number) lv).floatValue());
+ }
+ else if (rc == Double.class) {
+ lv = new Double(((Number) lv).doubleValue());
+ }
+ else {
+ return Boolean.FALSE;
+ }
+ } else if (lc == Integer.class) {
+ if (rc == Long.class) {
+ lv = new Long(((Number) lv).longValue());
+ }
+ else if (rc == Float.class) {
+ lv = new Float(((Number) lv).floatValue());
+ }
+ else if (rc == Double.class) {
+ lv = new Double(((Number) lv).doubleValue());
+ }
+ else {
+ return Boolean.FALSE;
+ }
+ }
+ else if (lc == Long.class) {
+ if (rc == Integer.class) {
+ rv = new Long(((Number) rv).longValue());
+ }
+ else if (rc == Float.class) {
+ lv = new Float(((Number) lv).floatValue());
+ }
+ else if (rc == Double.class) {
+ lv = new Double(((Number) lv).doubleValue());
+ }
+ else {
+ return Boolean.FALSE;
+ }
+ }
+ else if (lc == Float.class) {
+ if (rc == Integer.class) {
+ rv = new Float(((Number) rv).floatValue());
+ }
+ else if (rc == Long.class) {
+ rv = new Float(((Number) rv).floatValue());
+ }
+ else if (rc == Double.class) {
+ lv = new Double(((Number) lv).doubleValue());
+ }
+ else {
+ return Boolean.FALSE;
+ }
+ }
+ else if (lc == Double.class) {
+ if (rc == Integer.class) {
+ rv = new Double(((Number) rv).doubleValue());
+ }
+ else if (rc == Long.class) {
+ rv = new Double(((Number) rv).doubleValue());
+ }
+ else if (rc == Float.class) {
+ rv = new Float(((Number) rv).doubleValue());
+ }
+ else {
+ return Boolean.FALSE;
+ }
+ }
+ else
+ return Boolean.FALSE;
+ }
+ return asBoolean(lv.compareTo(rv)) ? Boolean.TRUE : Boolean.FALSE;
+ }
+
+ protected abstract boolean asBoolean(int answer);
+
+ public boolean matches(AMQMessage message) throws JMSException {
+ Object object = evaluate(message);
+ return object!=null && object==Boolean.TRUE;
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java
new file mode 100644
index 0000000000..9bde712da2
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java
@@ -0,0 +1,170 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.jms.JMSMessage;
+
+import java.math.BigDecimal;
+
+import javax.jms.JMSException;
+
+/**
+ * Represents a constant expression
+ *
+ * @version $Revision$
+ */
+public class ConstantExpression implements Expression {
+
+ static class BooleanConstantExpression extends ConstantExpression implements BooleanExpression {
+ public BooleanConstantExpression(Object value) {
+ super(value);
+ }
+ public boolean matches(AMQMessage message) throws JMSException {
+ Object object = evaluate(message);
+ return object!=null && object==Boolean.TRUE;
+ }
+ }
+
+ public static final BooleanConstantExpression NULL = new BooleanConstantExpression(null);
+ public static final BooleanConstantExpression TRUE = new BooleanConstantExpression(Boolean.TRUE);
+ public static final BooleanConstantExpression FALSE = new BooleanConstantExpression(Boolean.FALSE);
+
+ private Object value;
+
+ public static ConstantExpression createFromDecimal(String text) {
+
+ // Strip off the 'l' or 'L' if needed.
+ if( text.endsWith("l") || text.endsWith("L") )
+ text = text.substring(0, text.length()-1);
+
+ Number value;
+ try {
+ value = new Long(text);
+ } catch ( NumberFormatException e) {
+ // The number may be too big to fit in a long.
+ value = new BigDecimal(text);
+ }
+
+ long l = value.longValue();
+ if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) {
+ value = new Integer(value.intValue());
+ }
+ return new ConstantExpression(value);
+ }
+
+ public static ConstantExpression createFromHex(String text) {
+ Number value = new Long(Long.parseLong(text.substring(2), 16));
+ long l = value.longValue();
+ if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) {
+ value = new Integer(value.intValue());
+ }
+ return new ConstantExpression(value);
+ }
+
+ public static ConstantExpression createFromOctal(String text) {
+ Number value = new Long(Long.parseLong(text, 8));
+ long l = value.longValue();
+ if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) {
+ value = new Integer(value.intValue());
+ }
+ return new ConstantExpression(value);
+ }
+
+ public static ConstantExpression createFloat(String text) {
+ Number value = new Double(text);
+ return new ConstantExpression(value);
+ }
+
+ public ConstantExpression(Object value) {
+ this.value = value;
+ }
+
+ public Object evaluate(AMQMessage message) throws JMSException {
+ return value;
+ }
+
+ public Object getValue() {
+ return value;
+ }
+
+ /**
+ * @see java.lang.Object#toString()
+ */
+ public String toString() {
+ if (value == null) {
+ return "NULL";
+ }
+ if (value instanceof Boolean) {
+ return ((Boolean) value).booleanValue() ? "TRUE" : "FALSE";
+ }
+ if (value instanceof String) {
+ return encodeString((String) value);
+ }
+ return value.toString();
+ }
+
+ /**
+ * TODO: more efficient hashCode()
+ *
+ * @see java.lang.Object#hashCode()
+ */
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ /**
+ * TODO: more efficient hashCode()
+ *
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ public boolean equals(Object o) {
+
+ if (o == null || !this.getClass().equals(o.getClass())) {
+ return false;
+ }
+ return toString().equals(o.toString());
+
+ }
+
+
+ /**
+ * Encodes the value of string so that it looks like it would look like
+ * when it was provided in a selector.
+ *
+ * @param s
+ * @return
+ */
+ public static String encodeString(String s) {
+ StringBuffer b = new StringBuffer();
+ b.append('\'');
+ for (int i = 0; i < s.length(); i++) {
+ char c = s.charAt(i);
+ if (c == '\'') {
+ b.append(c);
+ }
+ b.append(c);
+ }
+ b.append('\'');
+ return b.toString();
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java
new file mode 100644
index 0000000000..a15c15fb91
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java
@@ -0,0 +1,42 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.jms.JMSMessage;
+
+import javax.jms.JMSException;
+
+
+/**
+ * Represents an expression
+ *
+ * @version $Revision$
+ */
+public interface Expression {
+
+ /**
+ * @return the value of this expression
+ */
+ public Object evaluate(AMQMessage message) throws JMSException;
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java
new file mode 100644
index 0000000000..c82de9fa15
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import org.apache.qpid.server.queue.AMQMessage;
+
+public interface FilterManager
+{
+ void add(MessageFilter filter);
+
+ void remove(MessageFilter filter);
+
+ boolean allAllow(AMQMessage msg);
+
+ boolean hasFilters();
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
new file mode 100644
index 0000000000..6ecd56586f
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.filter;
+
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.AMQException;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+
+
+public class FilterManagerFactory
+{
+ //private final static Logger _logger = LoggerFactory.getLogger(FilterManagerFactory.class);
+ private final static org.apache.log4j.Logger _logger = org.apache.log4j.Logger.getLogger(FilterManagerFactory.class);
+
+ //fixme move to a common class so it can be refered to from client code.
+ private static String JMS_SELECTOR_FILTER = "x-filter-jms-selector";
+
+ public static FilterManager createManager(FieldTable filters) throws AMQException
+ {
+ FilterManager manager = null;
+
+ if (filters != null)
+ {
+
+ manager = new SimpleFilterManager();
+
+ Iterator it = filters.keySet().iterator();
+ _logger.info("Processing filters:");
+ while (it.hasNext())
+ {
+ String key = (String) it.next();
+ _logger.info("filter:" + key);
+ if (key.equals(JMS_SELECTOR_FILTER))
+ {
+ String selector = (String) filters.get(key);
+
+ if (selector != null && !selector.equals(""))
+ {
+ manager.add(new JMSSelectorFilter(selector));
+ }
+ }
+
+ }
+
+ //If we added no filters don't bear the overhead of having an filter manager
+ if (!manager.hasFilters())
+ {
+ manager = null;
+ }
+ }
+ else
+ {
+ _logger.info("No Filters found.");
+ }
+
+
+ return manager;
+
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java b/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
new file mode 100644
index 0000000000..4884067237
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.filter;
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.filter.jms.selector.SelectorParser;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
+import org.apache.log4j.Logger;
+
+
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+
+public class JMSSelectorFilter implements MessageFilter
+{
+ private final static Logger _logger = org.apache.log4j.Logger.getLogger(JMSSelectorFilter.class);
+
+ private String _selector;
+ private BooleanExpression _matcher;
+
+ public JMSSelectorFilter(String selector) throws AMQException
+ {
+ _selector = selector;
+ _logger.info("Created JMSSelectorFilter with selector:" + _selector);
+
+
+ try
+ {
+ _matcher = new SelectorParser().parse(selector);
+ }
+ catch (InvalidSelectorException e)
+ {
+ // fixme
+ // Is this the correct way of throwing exception
+ throw new AMQInvalidSelectorException(e.getMessage());
+ }
+
+ }
+
+ public boolean matches(AMQMessage message)
+ {
+ try
+ {
+ boolean match = _matcher.matches(message);
+ _logger.info(message + " match(" + match + ") selector(" + System.identityHashCode(_selector) + "):" + _selector);
+ return match;
+ }
+ catch (JMSException e)
+ {
+ //fixme this needs to be sorted.. it shouldn't happen
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ return false;
+ }
+
+ public String getSelector()
+ {
+ return _selector;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java
new file mode 100644
index 0000000000..714d8c23f5
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java
@@ -0,0 +1,95 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.jms.JMSMessage;
+
+import javax.jms.JMSException;
+
+/**
+ * A filter performing a comparison of two objects
+ *
+ * @version $Revision$
+ */
+public abstract class LogicExpression extends BinaryExpression implements BooleanExpression {
+
+ public static BooleanExpression createOR(BooleanExpression lvalue, BooleanExpression rvalue) {
+ return new LogicExpression(lvalue, rvalue) {
+
+ public Object evaluate(AMQMessage message) throws JMSException {
+
+ Boolean lv = (Boolean) left.evaluate(message);
+ // Can we do an OR shortcut??
+ if (lv !=null && lv.booleanValue()) {
+ return Boolean.TRUE;
+ }
+
+ Boolean rv = (Boolean) right.evaluate(message);
+ return rv==null ? null : rv;
+ }
+
+ public String getExpressionSymbol() {
+ return "OR";
+ }
+ };
+ }
+
+ public static BooleanExpression createAND(BooleanExpression lvalue, BooleanExpression rvalue) {
+ return new LogicExpression(lvalue, rvalue) {
+
+ public Object evaluate(AMQMessage message) throws JMSException {
+
+ Boolean lv = (Boolean) left.evaluate(message);
+
+ // Can we do an AND shortcut??
+ if (lv == null)
+ return null;
+ if (!lv.booleanValue()) {
+ return Boolean.FALSE;
+ }
+
+ Boolean rv = (Boolean) right.evaluate(message);
+ return rv == null ? null : rv;
+ }
+
+ public String getExpressionSymbol() {
+ return "AND";
+ }
+ };
+ }
+
+ /**
+ * @param left
+ * @param right
+ */
+ public LogicExpression(BooleanExpression left, BooleanExpression right) {
+ super(left, right);
+ }
+
+ abstract public Object evaluate(AMQMessage message) throws JMSException;
+
+ public boolean matches(AMQMessage message) throws JMSException {
+ Object object = evaluate(message);
+ return object!=null && object==Boolean.TRUE;
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java b/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
new file mode 100644
index 0000000000..b8ca75d209
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.filter;
+
+import org.apache.qpid.server.queue.AMQMessage;
+
+import javax.jms.JMSException;
+
+public interface MessageFilter
+{
+ boolean matches(AMQMessage message) throws JMSException;
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
new file mode 100644
index 0000000000..f3e9965c2e
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
@@ -0,0 +1,305 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+
+//import org.apache.activemq.command.ActiveMQDestination;
+//import org.apache.activemq.command.Message;
+//import org.apache.activemq.command.TransactionId;
+//import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.jms.JMSMessage;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.log4j.Logger;
+
+/**
+ * Represents a property expression
+ *
+ * @version $Revision$
+ */
+public class PropertyExpression implements Expression
+{
+
+ interface SubExpression
+ {
+ public Object evaluate(AMQMessage message);
+ }
+
+ interface JMSExpression
+ {
+ public abstract Object evaluate(JMSMessage message);
+ }
+
+ static class SubJMSExpression implements SubExpression
+ {
+ JMSExpression _expression;
+
+ SubJMSExpression(JMSExpression expression)
+ {
+ _expression = expression;
+ }
+
+
+ public Object evaluate(AMQMessage message)
+ {
+ JMSMessage msg = (JMSMessage) message.getDecodedMessage(AMQMessage.JMS_MESSAGE);
+ if (msg != null)
+ {
+ return _expression.evaluate(msg);
+ }
+ else
+ {
+ return null;
+ }
+ }
+ }
+
+ private final static Logger _logger = org.apache.log4j.Logger.getLogger(PropertyExpression.class);
+
+
+ static final private HashMap JMS_PROPERTY_EXPRESSIONS = new HashMap();
+
+ static
+ {
+ JMS_PROPERTY_EXPRESSIONS.put("JMSDestination", new SubJMSExpression(
+ new JMSExpression()
+ {
+ public Object evaluate(JMSMessage message)
+ {
+ return message.getJMSDestination();
+ }
+ }
+ ));
+//
+// public Object evaluate(AMQMessage message)
+// {
+// //fixme
+//
+//
+//// AMQDestination dest = message.getOriginalDestination();
+//// if (dest == null)
+//// {
+//// dest = message.getDestination();
+//// }
+//// if (dest == null)
+//// {
+//// return null;
+//// }
+//// return dest.toString();
+// return "";
+// }
+// });
+ JMS_PROPERTY_EXPRESSIONS.put("JMSReplyTo", new SubJMSExpression(
+ new JMSExpression()
+ {
+ public Object evaluate(JMSMessage message)
+ {
+ return message.getJMSReplyTo();
+ }
+ })
+ );
+
+ JMS_PROPERTY_EXPRESSIONS.put("JMSType", new SubJMSExpression(
+ new JMSExpression()
+ {
+ public Object evaluate(JMSMessage message)
+ {
+ return message.getJMSType();
+ }
+ }
+ ));
+
+ JMS_PROPERTY_EXPRESSIONS.put("JMSDeliveryMode", new SubJMSExpression(
+ new JMSExpression()
+ {
+ public Object evaluate(JMSMessage message)
+ {
+ try
+ {
+ Integer mode = new Integer(message.getAMQMessage().isPersistent() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+ _logger.info("JMSDeliveryMode is :" + mode);
+ return mode;
+ }
+ catch (AMQException e)
+ {
+ //shouldn't happen
+ }
+
+ return DeliveryMode.NON_PERSISTENT;
+ }
+ }));
+
+ JMS_PROPERTY_EXPRESSIONS.put("JMSPriority", new SubJMSExpression(
+ new JMSExpression()
+ {
+ public Object evaluate(JMSMessage message)
+ {
+ return message.getJMSPriority();
+ }
+ }
+ ));
+
+
+ JMS_PROPERTY_EXPRESSIONS.put("AMQMessageID", new SubJMSExpression(
+ new JMSExpression()
+ {
+ public Object evaluate(JMSMessage message)
+ {
+ return message.getAMQMessage().getMessageId();
+ }
+ }
+ ));
+
+ JMS_PROPERTY_EXPRESSIONS.put("JMSTimestamp", new SubJMSExpression(
+ new JMSExpression()
+ {
+ public Object evaluate(JMSMessage message)
+ {
+ return message.getJMSTimestamp();
+ }
+ }
+ ));
+
+ JMS_PROPERTY_EXPRESSIONS.put("JMSCorrelationID", new SubJMSExpression(
+ new JMSExpression()
+ {
+ public Object evaluate(JMSMessage message)
+ {
+ return message.getJMSCorrelationID();
+ }
+ }
+ ));
+
+ JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration", new SubJMSExpression(
+ new JMSExpression()
+ {
+ public Object evaluate(JMSMessage message)
+ {
+ return message.getJMSExpiration();
+ }
+ }
+ ));
+
+ JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new SubJMSExpression(
+ new JMSExpression()
+ {
+ public Object evaluate(JMSMessage message)
+ {
+ return message.getAMQMessage().isRedelivered();
+ }
+ }
+ ));
+
+ }
+
+ private final String name;
+ private final SubExpression jmsPropertyExpression;
+
+ public PropertyExpression(String name)
+ {
+ this.name = name;
+ jmsPropertyExpression = (SubExpression) JMS_PROPERTY_EXPRESSIONS.get(name);
+ }
+
+ public Object evaluate(AMQMessage message) throws JMSException
+ {
+// try
+// {
+// if (message.isDropped())
+// {
+// return null;
+// }
+
+ if (jmsPropertyExpression != null)
+ {
+ return jmsPropertyExpression.evaluate(message);
+ }
+// try
+ else
+ {
+
+ BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties;
+
+ _logger.info("Looking up property:" + name);
+ _logger.info("Properties are:" + _properties.getHeaders().keySet());
+
+ return _properties.getHeaders().get(name);
+ }
+// catch (IOException ioe)
+// {
+// JMSException exception = new JMSException("Could not get property: " + name + " reason: " + ioe.getMessage());
+// exception.initCause(ioe);
+// throw exception;
+// }
+// }
+// catch (IOException e)
+// {
+// JMSException exception = new JMSException(e.getMessage());
+// exception.initCause(e);
+// throw exception;
+// }
+
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+
+ /**
+ * @see java.lang.Object#toString()
+ */
+ public String toString()
+ {
+ return name;
+ }
+
+ /**
+ * @see java.lang.Object#hashCode()
+ */
+ public int hashCode()
+ {
+ return name.hashCode();
+ }
+
+ /**
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ public boolean equals(Object o)
+ {
+
+ if (o == null || !this.getClass().equals(o.getClass()))
+ {
+ return false;
+ }
+ return name.equals(((PropertyExpression) o).name);
+
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java b/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
new file mode 100644
index 0000000000..dc2c2c0e6c
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.filter;
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class SimpleFilterManager implements FilterManager
+{
+ private final Logger _logger = LoggerFactory.getLogger(SimpleFilterManager.class);
+
+ private final ConcurrentLinkedQueue<MessageFilter> _filters;
+
+ public SimpleFilterManager()
+ {
+ _logger.debug("Creating SimpleFilterManager");
+ _filters = new ConcurrentLinkedQueue<MessageFilter>();
+ }
+
+ public void add(MessageFilter filter)
+ {
+ _filters.add(filter);
+ }
+
+ public void remove(MessageFilter filter)
+ {
+ _filters.remove(filter);
+ }
+
+ public boolean allAllow(AMQMessage msg)
+ {
+ for (MessageFilter filter : _filters)
+ {
+ try
+ {
+ if (!filter.matches(msg))
+ {
+ return false;
+ }
+ }
+ catch (JMSException e)
+ {
+ //fixme
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public boolean hasFilters()
+ {
+ return !_filters.isEmpty();
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java
new file mode 100644
index 0000000000..49ff147411
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java
@@ -0,0 +1,263 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.jms.JMSMessage;
+
+import java.math.BigDecimal;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.jms.JMSException;
+
+/**
+ * An expression which performs an operation on two expression values
+ *
+ * @version $Revision$
+ */
+public abstract class UnaryExpression implements Expression {
+
+ private static final BigDecimal BD_LONG_MIN_VALUE = BigDecimal.valueOf(Long.MIN_VALUE);
+ protected Expression right;
+
+ public static Expression createNegate(Expression left) {
+ return new UnaryExpression(left) {
+ public Object evaluate(AMQMessage message) throws JMSException {
+ Object rvalue = right.evaluate(message);
+ if (rvalue == null) {
+ return null;
+ }
+ if (rvalue instanceof Number) {
+ return negate((Number) rvalue);
+ }
+ return null;
+ }
+
+ public String getExpressionSymbol() {
+ return "-";
+ }
+ };
+ }
+
+ public static BooleanExpression createInExpression(PropertyExpression right, List elements, final boolean not) {
+
+ // Use a HashSet if there are many elements.
+ Collection t;
+ if( elements.size()==0 )
+ t=null;
+ else if( elements.size() < 5 )
+ t = elements;
+ else {
+ t = new HashSet(elements);
+ }
+ final Collection inList = t;
+
+ return new BooleanUnaryExpression(right) {
+ public Object evaluate(AMQMessage message) throws JMSException {
+
+ Object rvalue = right.evaluate(message);
+ if (rvalue == null) {
+ return null;
+ }
+ if( rvalue.getClass()!=String.class )
+ return null;
+
+ if( (inList!=null && inList.contains(rvalue)) ^ not ) {
+ return Boolean.TRUE;
+ } else {
+ return Boolean.FALSE;
+ }
+
+ }
+
+ public String toString() {
+ StringBuffer answer = new StringBuffer();
+ answer.append(right);
+ answer.append(" ");
+ answer.append(getExpressionSymbol());
+ answer.append(" ( ");
+
+ int count=0;
+ for (Iterator i = inList.iterator(); i.hasNext();) {
+ Object o = (Object) i.next();
+ if( count!=0 ) {
+ answer.append(", ");
+ }
+ answer.append(o);
+ count++;
+ }
+
+ answer.append(" )");
+ return answer.toString();
+ }
+
+ public String getExpressionSymbol() {
+ if( not )
+ return "NOT IN";
+ else
+ return "IN";
+ }
+ };
+ }
+
+ abstract static class BooleanUnaryExpression extends UnaryExpression implements BooleanExpression {
+ public BooleanUnaryExpression(Expression left) {
+ super(left);
+ }
+
+ public boolean matches(AMQMessage message) throws JMSException {
+ Object object = evaluate(message);
+ return object!=null && object==Boolean.TRUE;
+ }
+ };
+
+
+ public static BooleanExpression createNOT(BooleanExpression left) {
+ return new BooleanUnaryExpression(left) {
+ public Object evaluate(AMQMessage message) throws JMSException {
+ Boolean lvalue = (Boolean) right.evaluate(message);
+ if (lvalue == null) {
+ return null;
+ }
+ return lvalue.booleanValue() ? Boolean.FALSE : Boolean.TRUE;
+ }
+
+ public String getExpressionSymbol() {
+ return "NOT";
+ }
+ };
+ }
+
+ public static BooleanExpression createXPath(final String xpath) {
+ return new XPathExpression(xpath);
+ }
+
+ public static BooleanExpression createXQuery(final String xpath) {
+ return new XQueryExpression(xpath);
+ }
+
+ public static BooleanExpression createBooleanCast(Expression left) {
+ return new BooleanUnaryExpression(left) {
+ public Object evaluate(AMQMessage message) throws JMSException {
+ Object rvalue = right.evaluate(message);
+ if (rvalue == null)
+ return null;
+ if (!rvalue.getClass().equals(Boolean.class))
+ return Boolean.FALSE;
+ return ((Boolean)rvalue).booleanValue() ? Boolean.TRUE : Boolean.FALSE;
+ }
+
+ public String toString() {
+ return right.toString();
+ }
+
+ public String getExpressionSymbol() {
+ return "";
+ }
+ };
+ }
+
+ private static Number negate(Number left) {
+ Class clazz = left.getClass();
+ if (clazz == Integer.class) {
+ return new Integer(-left.intValue());
+ }
+ else if (clazz == Long.class) {
+ return new Long(-left.longValue());
+ }
+ else if (clazz == Float.class) {
+ return new Float(-left.floatValue());
+ }
+ else if (clazz == Double.class) {
+ return new Double(-left.doubleValue());
+ }
+ else if (clazz == BigDecimal.class) {
+ // We ussually get a big deciamal when we have Long.MIN_VALUE constant in the
+ // Selector. Long.MIN_VALUE is too big to store in a Long as a positive so we store it
+ // as a Big decimal. But it gets Negated right away.. to here we try to covert it back
+ // to a Long.
+ BigDecimal bd = (BigDecimal)left;
+ bd = bd.negate();
+
+ if( BD_LONG_MIN_VALUE.compareTo(bd)==0 ) {
+ return new Long(Long.MIN_VALUE);
+ }
+ return bd;
+ }
+ else {
+ throw new RuntimeException("Don't know how to negate: "+left);
+ }
+ }
+
+ public UnaryExpression(Expression left) {
+ this.right = left;
+ }
+
+ public Expression getRight() {
+ return right;
+ }
+
+ public void setRight(Expression expression) {
+ right = expression;
+ }
+
+ /**
+ * @see java.lang.Object#toString()
+ */
+ public String toString() {
+ return "(" + getExpressionSymbol() + " " + right.toString() + ")";
+ }
+
+ /**
+ * TODO: more efficient hashCode()
+ *
+ * @see java.lang.Object#hashCode()
+ */
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ /**
+ * TODO: more efficient hashCode()
+ *
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ public boolean equals(Object o) {
+
+ if (o == null || !this.getClass().equals(o.getClass())) {
+ return false;
+ }
+ return toString().equals(o.toString());
+
+ }
+
+ /**
+ * Returns the symbol that represents this binary expression. For example, addition is
+ * represented by "+"
+ *
+ * @return
+ */
+ abstract public String getExpressionSymbol();
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
new file mode 100644
index 0000000000..ab952b6fea
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
@@ -0,0 +1,130 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+import javax.jms.JMSException;
+
+//import org.apache.activemq.command.Message;
+//import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.qpid.server.queue.AMQMessage;
+
+/**
+ * Used to evaluate an XPath Expression in a JMS selector.
+ */
+public final class XPathExpression implements BooleanExpression {
+
+ private static final Log log = LogFactory.getLog(XPathExpression.class);
+ private static final String EVALUATOR_SYSTEM_PROPERTY = "org.apache.qpid.server.filter.XPathEvaluatorClassName";
+ private static final String DEFAULT_EVALUATOR_CLASS_NAME=XalanXPathEvaluator.class.getName();
+
+ private static final Constructor EVALUATOR_CONSTRUCTOR;
+
+ static {
+ String cn = System.getProperty(EVALUATOR_SYSTEM_PROPERTY, DEFAULT_EVALUATOR_CLASS_NAME);
+ Constructor m = null;
+ try {
+ try {
+ m = getXPathEvaluatorConstructor(cn);
+ } catch (Throwable e) {
+ log.warn("Invalid "+XPathEvaluator.class.getName()+" implementation: "+cn+", reason: "+e,e);
+ cn = DEFAULT_EVALUATOR_CLASS_NAME;
+ try {
+ m = getXPathEvaluatorConstructor(cn);
+ } catch (Throwable e2) {
+ log.error("Default XPath evaluator could not be loaded",e);
+ }
+ }
+ } finally {
+ EVALUATOR_CONSTRUCTOR = m;
+ }
+ }
+
+ private static Constructor getXPathEvaluatorConstructor(String cn) throws ClassNotFoundException, SecurityException, NoSuchMethodException {
+ Class c = XPathExpression.class.getClassLoader().loadClass(cn);
+ if( !XPathEvaluator.class.isAssignableFrom(c) ) {
+ throw new ClassCastException(""+c+" is not an instance of "+XPathEvaluator.class);
+ }
+ return c.getConstructor(new Class[]{String.class});
+ }
+
+ private final String xpath;
+ private final XPathEvaluator evaluator;
+
+ static public interface XPathEvaluator {
+ public boolean evaluate(AMQMessage message) throws JMSException;
+ }
+
+ XPathExpression(String xpath) {
+ this.xpath = xpath;
+ this.evaluator = createEvaluator(xpath);
+ }
+
+ private XPathEvaluator createEvaluator(String xpath2) {
+ try {
+ return (XPathEvaluator)EVALUATOR_CONSTRUCTOR.newInstance(new Object[]{xpath});
+ } catch (InvocationTargetException e) {
+ Throwable cause = e.getCause();
+ if( cause instanceof RuntimeException ) {
+ throw (RuntimeException)cause;
+ }
+ throw new RuntimeException("Invalid XPath Expression: "+xpath+" reason: "+e.getMessage(), e);
+ } catch (Throwable e) {
+ throw new RuntimeException("Invalid XPath Expression: "+xpath+" reason: "+e.getMessage(), e);
+ }
+ }
+
+ public Object evaluate(AMQMessage message) throws JMSException {
+// try {
+//FIXME this is flow to disk work
+// if( message.isDropped() )
+// return null;
+ return evaluator.evaluate(message) ? Boolean.TRUE : Boolean.FALSE;
+// } catch (IOException e) {
+//
+// JMSException exception = new JMSException(e.getMessage());
+// exception.initCause(e);
+// throw exception;
+//
+// }
+
+ }
+
+ public String toString() {
+ return "XPATH "+ConstantExpression.encodeString(xpath);
+ }
+
+ /**
+ * @param message
+ * @return true if the expression evaluates to Boolean.TRUE.
+ * @throws JMSException
+ */
+ public boolean matches(AMQMessage message) throws JMSException {
+ Object object = evaluate(message);
+ return object!=null && object==Boolean.TRUE;
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
new file mode 100644
index 0000000000..53764cbf75
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
@@ -0,0 +1,56 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.filter;
+
+import org.apache.qpid.server.queue.AMQMessage;
+
+import javax.jms.JMSException;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+/**
+ * Used to evaluate an XQuery Expression in a JMS selector.
+ */
+public final class XQueryExpression implements BooleanExpression {
+ private final String xpath;
+
+ XQueryExpression(String xpath) {
+ super();
+ this.xpath = xpath;
+ }
+
+ public Object evaluate(AMQMessage message) throws JMSException {
+ return Boolean.FALSE;
+ }
+
+ public String toString() {
+ return "XQUERY "+ConstantExpression.encodeString(xpath);
+ }
+
+ /**
+ * @param message
+ * @return true if the expression evaluates to Boolean.TRUE.
+ * @throws JMSException
+ */
+ public boolean matches(AMQMessage message) throws JMSException {
+ Object object = evaluate(message);
+ return object!=null && object==Boolean.TRUE;
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java b/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
new file mode 100644
index 0000000000..4b78fd18df
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
@@ -0,0 +1,99 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.server.filter;
+//
+// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
+//
+
+import java.io.StringReader;
+import java.io.ByteArrayInputStream;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+
+//import org.apache.activemq.command.Message;
+//import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.xpath.CachedXPathAPI;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.w3c.dom.Document;
+import org.w3c.dom.traversal.NodeIterator;
+import org.xml.sax.InputSource;
+
+public class XalanXPathEvaluator implements XPathExpression.XPathEvaluator {
+
+ private final String xpath;
+
+ public XalanXPathEvaluator(String xpath) {
+ this.xpath = xpath;
+ }
+
+ public boolean evaluate(AMQMessage m) throws JMSException {
+ if( m instanceof TextMessage ) {
+ String text = ((TextMessage)m).getText();
+ return evaluate(text);
+ } else if ( m instanceof BytesMessage ) {
+ BytesMessage bm = (BytesMessage) m;
+ byte data[] = new byte[(int) bm.getBodyLength()];
+ bm.readBytes(data);
+ return evaluate(data);
+ }
+ return false;
+ }
+
+ private boolean evaluate(byte[] data) {
+ try {
+
+ InputSource inputSource = new InputSource(new ByteArrayInputStream(data));
+
+ DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+ factory.setNamespaceAware(true);
+ DocumentBuilder dbuilder = factory.newDocumentBuilder();
+ Document doc = dbuilder.parse(inputSource);
+
+ CachedXPathAPI cachedXPathAPI = new CachedXPathAPI();
+ NodeIterator iterator = cachedXPathAPI.selectNodeIterator(doc,xpath);
+ return iterator.nextNode()!=null;
+
+ } catch (Throwable e) {
+ return false;
+ }
+ }
+
+ private boolean evaluate(String text) {
+ try {
+ InputSource inputSource = new InputSource(new StringReader(text));
+
+ DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+ factory.setNamespaceAware(true);
+ DocumentBuilder dbuilder = factory.newDocumentBuilder();
+ Document doc = dbuilder.parse(inputSource);
+
+ // We should associated the cachedXPathAPI object with the message being evaluated
+ // since that should speedup subsequent xpath expressions.
+ CachedXPathAPI cachedXPathAPI = new CachedXPathAPI();
+ NodeIterator iterator = cachedXPathAPI.selectNodeIterator(doc,xpath);
+ return iterator.nextNode()!=null;
+ } catch (Throwable e) {
+ return false;
+ }
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
index d4c94061a0..bf282020ee 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
@@ -21,10 +21,12 @@
package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.framing.BasicConsumeBody;
import org.apache.qpid.framing.BasicConsumeOkBody;
import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.ConsumerTagNotUniqueException;
import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -32,6 +34,7 @@ import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.log4j.Logger;
@@ -68,14 +71,14 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
{
AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : queueRegistry.getQueue(body.queue);
- if(queue == null)
+ if (queue == null)
{
_log.info("No queue for '" + body.queue + "'");
}
try
{
- String consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck);
- if(!body.nowait)
+ String consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck, body.arguments);
+ if (!body.nowait)
{
session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, consumerTag));
}
@@ -83,10 +86,19 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
//now allow queue to start async processing of any backlog of messages
queue.deliverAsync();
}
- catch(ConsumerTagNotUniqueException e)
+ catch (AMQInvalidSelectorException ise)
+ {
+ _log.info("Closing connection due to invalid selector");
+ session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, AMQConstant.INVALID_SELECTOR.getCode(),
+ ise.getMessage(), BasicConsumeBody.CLASS_ID,
+ BasicConsumeBody.METHOD_ID));
+ }
+ catch (ConsumerTagNotUniqueException e)
{
String msg = "Non-unique consumer tag, '" + body.consumerTag + "'";
- session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, AMQConstant.NOT_ALLOWED.getCode(), msg, BasicConsumeBody.CLASS_ID, BasicConsumeBody.METHOD_ID));
+ session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, AMQConstant.NOT_ALLOWED.getCode(), msg,
+ BasicConsumeBody.CLASS_ID,
+ BasicConsumeBody.METHOD_ID));
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/message/MessageDecorator.java b/java/broker/src/main/java/org/apache/qpid/server/message/MessageDecorator.java
new file mode 100644
index 0000000000..aba3b88a59
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/message/MessageDecorator.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.message;
+
+public interface MessageDecorator
+{
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java b/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java
new file mode 100644
index 0000000000..72e241ea0a
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.message.jms;
+
+import org.apache.qpid.server.message.MessageDecorator;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+import javax.jms.Message;
+import javax.jms.JMSException;
+import javax.jms.Destination;
+import javax.jms.MessageNotWriteableException;
+import java.util.Enumeration;
+
+public class JMSMessage implements MessageDecorator
+{
+
+ private AMQMessage _message;
+ private BasicContentHeaderProperties _properties;
+
+ public JMSMessage(AMQMessage message)
+ {
+ _message = message;
+ ContentHeaderBody contentHeader = message.getContentHeaderBody();
+ _properties = (BasicContentHeaderProperties) contentHeader.properties;
+ }
+
+ protected void checkWriteable() throws MessageNotWriteableException
+ {
+ //The broker should not modify a message.
+// if (_readableMessage)
+ {
+ throw new MessageNotWriteableException("The broker should not modify a message.");
+ }
+ }
+
+
+ public String getJMSMessageID()
+ {
+ return _properties.getMessageId();
+ }
+
+ public void setJMSMessageID(String string) throws MessageNotWriteableException
+ {
+ checkWriteable();
+ _properties.setMessageId(string);
+ }
+
+ public long getJMSTimestamp()
+ {
+ return _properties.getTimestamp();
+ }
+
+ public void setJMSTimestamp(long l) throws MessageNotWriteableException
+ {
+ checkWriteable();
+ _properties.setTimestamp(l);
+ }
+
+ public byte[] getJMSCorrelationIDAsBytes()
+ {
+ return _properties.getCorrelationId().getBytes();
+ }
+
+// public void setJMSCorrelationIDAsBytes(byte[] bytes)
+// {
+// }
+
+ public void setJMSCorrelationID(String string) throws MessageNotWriteableException
+ {
+ checkWriteable();
+ _properties.setCorrelationId(string);
+ }
+
+ public String getJMSCorrelationID()
+ {
+ return _properties.getCorrelationId();
+ }
+
+ public String getJMSReplyTo()
+ {
+ return _properties.getReplyTo();
+ }
+
+ public void setJMSReplyTo(Destination destination) throws MessageNotWriteableException
+ {
+ checkWriteable();
+ _properties.setReplyTo(destination.toString());
+ }
+
+ public String getJMSDestination()
+ {
+ //fixme should be a deestination
+ return "";
+ }
+
+ public void setJMSDestination(Destination destination) throws MessageNotWriteableException
+ {
+ checkWriteable();
+ //_properties.setDestination(destination.toString());
+ }
+
+ public int getJMSDeliveryMode()
+ {
+ return _properties.getDeliveryMode();
+ }
+
+ public void setJMSDeliveryMode(byte i) throws MessageNotWriteableException
+ {
+ checkWriteable();
+ _properties.setDeliveryMode(i);
+ }
+
+ public boolean getJMSRedelivered()
+ {
+ return _message.isRedelivered();
+ }
+
+ public void setJMSRedelivered(boolean b) throws MessageNotWriteableException
+ {
+ checkWriteable();
+ _message.setRedelivered(b);
+ }
+
+ public String getJMSType()
+ {
+ return _properties.getType();
+ }
+
+ public void setJMSType(String string) throws MessageNotWriteableException
+ {
+ checkWriteable();
+ _properties.setType(string);
+ }
+
+ public long getJMSExpiration()
+ {
+ return _properties.getExpiration();
+ }
+
+ public void setJMSExpiration(long l) throws MessageNotWriteableException
+ {
+ checkWriteable();
+ _properties.setExpiration(l);
+ }
+
+ public int getJMSPriority()
+ {
+ return _properties.getPriority();
+ }
+
+ public void setJMSPriority(byte i) throws MessageNotWriteableException
+ {
+ checkWriteable();
+ _properties.setPriority(i);
+ }
+
+ public void clearProperties() throws MessageNotWriteableException
+ {
+ checkWriteable();
+ _properties.getJMSHeaders().clear();
+ }
+
+ public boolean propertyExists(String string)
+ {
+ return _properties.getJMSHeaders().propertyExists(string);
+ }
+
+ public boolean getBooleanProperty(String string) throws JMSException
+ {
+ return _properties.getJMSHeaders().getBoolean(string);
+ }
+
+ public byte getByteProperty(String string) throws JMSException
+ {
+ return _properties.getJMSHeaders().getByte(string);
+ }
+
+ public short getShortProperty(String string) throws JMSException
+ {
+ return _properties.getJMSHeaders().getShort(string);
+ }
+
+ public int getIntProperty(String string) throws JMSException
+ {
+ return _properties.getJMSHeaders().getInteger(string);
+ }
+
+ public long getLongProperty(String string) throws JMSException
+ {
+ return _properties.getJMSHeaders().getLong(string);
+ }
+
+ public float getFloatProperty(String string) throws JMSException
+ {
+ return _properties.getJMSHeaders().getFloat(string);
+ }
+
+ public double getDoubleProperty(String string) throws JMSException
+ {
+ return _properties.getJMSHeaders().getDouble(string);
+ }
+
+ public String getStringProperty(String string) throws JMSException
+ {
+ return _properties.getJMSHeaders().getString(string);
+ }
+
+ public Object getObjectProperty(String string) throws JMSException
+ {
+ return _properties.getJMSHeaders().getObject(string);
+ }
+
+ public Enumeration getPropertyNames()
+ {
+ return _properties.getJMSHeaders().getPropertyNames();
+ }
+
+ public void setBooleanProperty(String string, boolean b) throws JMSException
+ {
+ checkWriteable();
+ _properties.getJMSHeaders().setBoolean(string, b);
+ }
+
+ public void setByteProperty(String string, byte b) throws JMSException
+ {
+ checkWriteable();
+ _properties.getJMSHeaders().setByte(string, b);
+ }
+
+ public void setShortProperty(String string, short i) throws JMSException
+ {
+ checkWriteable();
+ _properties.getJMSHeaders().setShort(string, i);
+ }
+
+ public void setIntProperty(String string, int i) throws JMSException
+ {
+ checkWriteable();
+ _properties.getJMSHeaders().setInteger(string, i);
+ }
+
+ public void setLongProperty(String string, long l) throws JMSException
+ {
+ checkWriteable();
+ _properties.getJMSHeaders().setLong(string, l);
+ }
+
+ public void setFloatProperty(String string, float v) throws JMSException
+ {
+ checkWriteable();
+ _properties.getJMSHeaders().setFloat(string, v);
+ }
+
+ public void setDoubleProperty(String string, double v) throws JMSException
+ {
+ checkWriteable();
+ _properties.getJMSHeaders().setDouble(string, v);
+ }
+
+ public void setStringProperty(String string, String string1) throws JMSException
+ {
+ checkWriteable();
+ _properties.getJMSHeaders().setString(string, string1);
+ }
+
+ public void setObjectProperty(String string, Object object) throws JMSException
+ {
+ checkWriteable();
+ _properties.getJMSHeaders().setObject(string, object);
+ }
+
+ public void acknowledge() throws MessageNotWriteableException
+ {
+ checkWriteable();
+ }
+
+ public void clearBody() throws MessageNotWriteableException
+ {
+ checkWriteable();
+ }
+
+ public AMQMessage getAMQMessage()
+ {
+ return _message;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 12e06b31ed..b27cd807c0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -25,6 +25,8 @@ import org.apache.qpid.framing.*;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.txn.TxnBuffer;
+import org.apache.qpid.server.message.MessageDecorator;
+import org.apache.qpid.server.message.jms.JMSMessage;
import org.apache.qpid.AMQException;
import java.util.ArrayList;
@@ -33,17 +35,21 @@ import java.util.LinkedList;
import java.util.Set;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ConcurrentHashMap;
/**
* Combines the information that make up a deliverable message into a more manageable form.
*/
public class AMQMessage
{
+ public static final String JMS_MESSAGE = "jms.message";
+
private final Set<Object> _tokens = new HashSet<Object>();
private AMQProtocolSession _publisher;
- private final BasicPublishBody _publishBody;
+ private final BasicPublishBody _publishBody;
private ContentHeaderBody _contentHeaderBody;
@@ -83,6 +89,8 @@ public class AMQMessage
* messages published with the 'immediate' flag.
*/
private boolean _deliveredToConsumer;
+ private ConcurrentHashMap<String, MessageDecorator> _decodedMessages;
+ private AtomicBoolean _taken;
public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody)
@@ -96,7 +104,9 @@ public class AMQMessage
_publishBody = publishBody;
_store = messageStore;
_contentBodies = new LinkedList<ContentBody>();
+ _decodedMessages = new ConcurrentHashMap<String, MessageDecorator>();
_storeWhenComplete = storeWhenComplete;
+ _taken = new AtomicBoolean(false);
}
public AMQMessage(MessageStore store, long messageId, BasicPublishBody publishBody,
@@ -107,6 +117,7 @@ public class AMQMessage
_publishBody = publishBody;
_contentHeaderBody = contentHeaderBody;
_contentBodies = contentBodies;
+ _decodedMessages = new ConcurrentHashMap<String, MessageDecorator>();
_messageId = messageId;
_store = store;
storeMessage();
@@ -271,7 +282,7 @@ public class AMQMessage
{
_store.removeMessage(_messageId);
}
- catch(AMQException e)
+ catch (AMQException e)
{
//to maintain consistency, we revert the count
incrementReference();
@@ -292,7 +303,7 @@ public class AMQMessage
public boolean checkToken(Object token)
{
- if(_tokens.contains(token))
+ if (_tokens.contains(token))
{
return true;
}
@@ -308,7 +319,7 @@ public class AMQMessage
//if the message is not persistent or the queue is not durable
//we will not need to recover the association and so do not
//need to record it
- if(isPersistent() && queue.isDurable())
+ if (isPersistent() && queue.isDurable())
{
_store.enqueueMessage(queue.getName(), _messageId);
}
@@ -318,7 +329,7 @@ public class AMQMessage
{
//only record associations where both queue and message will survive
//a restart, so only need to remove association if this is the case
- if(isPersistent() && queue.isDurable())
+ if (isPersistent() && queue.isDurable())
{
_store.dequeueMessage(queue.getName(), _messageId);
}
@@ -326,14 +337,14 @@ public class AMQMessage
public boolean isPersistent() throws AMQException
{
- if(_contentHeaderBody == null)
+ if (_contentHeaderBody == null)
{
throw new AMQException("Cannot determine delivery mode of message. Content header not found.");
}
//todo remove literal values to a constant file such as AMQConstants in common
return _contentHeaderBody.properties instanceof BasicContentHeaderProperties
- &&((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2;
+ && ((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2;
}
public void setTxnBuffer(TxnBuffer buffer)
@@ -352,8 +363,9 @@ public class AMQMessage
* immediate delivery but has not been marked as delivered to a
* consumer
*/
- public void checkDeliveredToConsumer() throws NoConsumersException{
- if(isImmediate() && !_deliveredToConsumer)
+ public void checkDeliveredToConsumer() throws NoConsumersException
+ {
+ if (isImmediate() && !_deliveredToConsumer)
{
throw new NoConsumersException(_publishBody, _contentHeaderBody, _contentBodies);
}
@@ -362,8 +374,64 @@ public class AMQMessage
/**
* Called when this message is delivered to a consumer. (used to
* implement the 'immediate' flag functionality).
+ * And by selectors to determin if the message has already been sent
*/
- public void setDeliveredToConsumer(){
+ public void setDeliveredToConsumer()
+ {
_deliveredToConsumer = true;
}
+
+ /**
+ * Called selectors to determin if the message has already been sent
+ * @return _deliveredToConsumer
+ */
+ public boolean getDeliveredToConsumer()
+ {
+ return _deliveredToConsumer;
+ }
+
+
+ public MessageDecorator getDecodedMessage(String type)
+ {
+ MessageDecorator msgtype = null;
+
+ if (_decodedMessages != null)
+ {
+ msgtype = _decodedMessages.get(type);
+
+ if (msgtype == null)
+ {
+ msgtype = decorateMessage(type);
+ }
+ }
+
+ return msgtype;
+ }
+
+ private MessageDecorator decorateMessage(String type)
+ {
+ MessageDecorator msgdec = null;
+
+ if (type.equals(JMS_MESSAGE))
+ {
+ msgdec = new JMSMessage(this);
+ }
+
+ if (msgdec != null)
+ {
+ _decodedMessages.put(type, msgdec);
+ }
+
+ return msgdec;
+ }
+
+ public boolean taken()
+ {
+ return _taken.getAndSet(true);
+ }
+
+ public void release()
+ {
+ _taken.set(false);
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index f2ef97cf9a..e64daef690 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
@@ -187,16 +188,29 @@ public class AMQQueue implements Managable, Comparable
_subscribers = subscribers;
_subscriptionFactory = subscriptionFactory;
- //fixme - Pick one.
- if (Boolean.getBoolean("concurrentdeliverymanager"))
+ //fixme - Make this configurable via the broker config.xml
+ if (System.getProperties().getProperty("deliverymanager") != null)
{
- _logger.info("Using ConcurrentDeliveryManager");
- _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this);
+ if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentSelectorDeliveryManager"))
+ {
+ _logger.info("Using ConcurrentSelectorDeliveryManager");
+ _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
+ }
+ else if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentDeliveryManager"))
+ {
+ _logger.info("Using ConcurrentDeliveryManager");
+ _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this);
+ }
+ else
+ {
+ _logger.info("Using SynchronizedDeliveryManager");
+ _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this);
+ }
}
else
{
- _logger.info("Using SynchronizedDeliveryManager");
- _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this);
+ _logger.info("Using Default DeliveryManager: ConcurrentSelectorDeliveryManager");
+ _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
}
}
@@ -348,12 +362,12 @@ public class AMQQueue implements Managable, Comparable
_bindings.addBinding(routingKey, exchange);
}
- public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks)
+ public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters)
throws AMQException
{
debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this);
- Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks);
+ Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters);
_subscribers.addSubscriber(subscription);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
new file mode 100644
index 0000000000..d8bb6e1948
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -0,0 +1,352 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
+import org.apache.qpid.configuration.Configured;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.server.configuration.Configurator;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+/**
+ * Manages delivery of messages on behalf of a queue
+ */
+public class ConcurrentSelectorDeliveryManager implements DeliveryManager
+{
+ private static final Logger _log = Logger.getLogger(ConcurrentSelectorDeliveryManager.class);
+
+ @Configured(path = "advanced.compressBufferOnQueue",
+ defaultValue = "false")
+ public boolean compressBufferOnQueue;
+ /**
+ * Holds any queued messages
+ */
+ private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
+ //private int _messageCount;
+ /**
+ * Ensures that only one asynchronous task is running for this manager at
+ * any time.
+ */
+ private final AtomicBoolean _processing = new AtomicBoolean();
+ /**
+ * The subscriptions on the queue to whom messages are delivered
+ */
+ private final SubscriptionManager _subscriptions;
+
+ /**
+ * A reference to the queue we are delivering messages for. We need this to be able
+ * to pass the code that handles acknowledgements a handle on the queue.
+ */
+ private final AMQQueue _queue;
+
+
+ /**
+ * Lock used to ensure that an channel that becomes unsuspended during the start of the queueing process is forced
+ * to wait till the first message is added to the queue. This will ensure that the _queue has messages to be delivered
+ * via the async thread.
+ * <p/>
+ * Lock is used to control access to hasQueuedMessages() and over the addition of messages to the queue.
+ */
+ private ReentrantLock _lock = new ReentrantLock();
+
+
+ ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
+ {
+
+ //Set values from configuration
+ Configurator.configure(this);
+
+ if (compressBufferOnQueue)
+ {
+ _log.warn("Compressing Buffers on queue.");
+ }
+
+ _subscriptions = subscriptions;
+ _queue = queue;
+ }
+
+
+ private boolean addMessageToQueue(AMQMessage msg)
+ {
+ // Shrink the ContentBodies to their actual size to save memory.
+ if (compressBufferOnQueue)
+ {
+ Iterator it = msg.getContentBodies().iterator();
+ while (it.hasNext())
+ {
+ ContentBody cb = (ContentBody) it.next();
+ cb.reduceBufferToFit();
+ }
+ }
+
+ _messages.offer(msg);
+
+ return true;
+ }
+
+
+ public boolean hasQueuedMessages()
+ {
+ _lock.lock();
+ try
+ {
+ return !_messages.isEmpty();
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ public int getQueueMessageCount()
+ {
+ return getMessageCount();
+ }
+
+ /**
+ * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue as it must run the queue to determine size.
+ * The ConcurrentLinkedQueueAtomicSize uses an AtomicInteger to record the number of elements on the queue.
+ *
+ * @return int the number of messages in the delivery queue.
+ */
+ private int getMessageCount()
+ {
+ return _messages.size();
+ }
+
+
+ public synchronized List<AMQMessage> getMessages()
+ {
+ return new ArrayList<AMQMessage>(_messages);
+ }
+
+ public synchronized void removeAMessageFromTop() throws AMQException
+ {
+ AMQMessage msg = poll();
+ if (msg != null)
+ {
+ msg.dequeue(_queue);
+ }
+ }
+
+ public synchronized void clearAllMessages() throws AMQException
+ {
+ AMQMessage msg = poll();
+ while (msg != null)
+ {
+ msg.dequeue(_queue);
+ msg = poll();
+ }
+ }
+
+
+ private AMQMessage getNextMessage(Queue<AMQMessage> messages)
+ {
+ AMQMessage message = messages.peek();
+
+ while (message != null && message.taken())
+ {
+ //remove the already taken message
+ messages.poll();
+ // try the next message
+ message = messages.peek();
+ }
+ return message;
+ }
+
+ public void sendNextMessage(Subscription sub, Queue<AMQMessage> messageQueue, AMQQueue queue)
+ {
+ AMQMessage message = null;
+ try
+ {
+ message = getNextMessage(messageQueue);
+
+ // message will be null if we have no messages in the messageQueue.
+ if (message == null)
+ {
+ return;
+ }
+ _log.info("Async Delivery Message:" + message + " to :" + sub);
+
+ sub.send(message, queue);
+ message.setDeliveredToConsumer();
+
+ //remove sent message from our queue.
+ messageQueue.poll();
+ }
+ catch (FailedDequeueException e)
+ {
+ message.release();
+ _log.error("Unable to deliver message as dequeue failed: " + e, e);
+ }
+ }
+
+ /**
+ * Only one thread should ever execute this method concurrently, but
+ * it can do so while other threads invoke deliver().
+ */
+ private void processQueue()
+ {
+ // Continue to process delivery while we haveSubscribers and messages
+ boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
+
+ while (hasSubscribers && hasQueuedMessages())
+ {
+ for (Subscription sub : _subscriptions.getSubscriptions())
+ {
+ if (!sub.isSuspended())
+ {
+ if (sub.hasFilters())
+ {
+ sendNextMessage(sub, sub.getPreDeliveryQueue(), _queue);
+ }
+ else
+ {
+ sendNextMessage(sub, _messages, _queue);
+ }
+
+ hasSubscribers = true;
+ }
+ else
+ {
+ hasSubscribers = false;
+ }
+ }
+ }
+ }
+
+ private AMQMessage poll()
+ {
+ return _messages.poll();
+ }
+
+ public void deliver(String name, AMQMessage msg) throws FailedDequeueException
+ {
+ _log.info("deliver :" + msg);
+
+ //Check if we have someone to deliver the message to.
+ _lock.lock();
+ try
+ {
+ Subscription s = _subscriptions.nextSubscriber(msg);
+
+ if (s == null) //no-one can take the message right now.
+ {
+ _log.info("Testing Message(" + msg + ") for Queued Delivery");
+ if (!msg.isImmediate())
+ {
+ addMessageToQueue(msg);
+
+ //release lock now message is on queue.
+ _lock.unlock();
+
+ //Pre Deliver to all subscriptions
+ _log.info("We have " + _subscriptions.getSubscriptions().size() + " subscribers to give the message to.");
+ for (Subscription sub : _subscriptions.getSubscriptions())
+ {
+
+ // stop if the message gets delivered whilst PreDelivering if we have a shared queue.
+ if (_queue.isShared() && msg.getDeliveredToConsumer())
+ {
+ _log.info("Stopping PreDelivery as message(" + msg + ") is already delivered.");
+ continue;
+ }
+
+ // Only give the message to those that want them.
+ if (sub.hasFilters() && sub.hasInterest(msg))
+ {
+ sub.enqueueForPreDelivery(msg);
+ }
+ }
+ }
+ }
+ else
+ {
+ //release lock now
+ _lock.unlock();
+
+ _log.info("Delivering Message:" + msg + " to(" + System.identityHashCode(s) + ") :" + s);
+ //Deliver the message
+ s.send(msg, _queue);
+ msg.setDeliveredToConsumer();
+ }
+ }
+ finally
+ {
+ //ensure lock is released
+ if (_lock.isLocked())
+ {
+ _lock.unlock();
+ }
+ }
+ }
+
+ Runner asyncDelivery = new Runner();
+
+ private class Runner implements Runnable
+ {
+ public void run()
+ {
+ boolean running = true;
+ while (running)
+ {
+ processQueue();
+
+ //Check that messages have not been added since we did our last peek();
+ // Synchronize with the thread that adds to the queue.
+ // If the queue is still empty then we can exit
+
+ if (!(hasQueuedMessages() && _subscriptions.hasActiveSubscribers()))
+ {
+ running = false;
+ _processing.set(false);
+ }
+ }
+ }
+ }
+
+ public void processAsync(Executor executor)
+ {
+ _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" +
+ " Active:" + _subscriptions.hasActiveSubscribers() +
+ " Processing:" + _processing.get());
+
+ if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
+ {
+ //are we already running? if so, don't re-run
+ if (_processing.compareAndSet(false, true))
+ {
+ executor.execute(asyncDelivery);
+ }
+ }
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
index 49f0a51bf2..523b5f06e9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
@@ -20,6 +20,10 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.qpid.AMQException;
+
+import java.util.Queue;
+
public interface Subscription
{
void send(AMQMessage msg, AMQQueue queue) throws FailedDequeueException;
@@ -27,4 +31,13 @@ public interface Subscription
boolean isSuspended();
void queueDeleted(AMQQueue queue);
+
+ boolean hasFilters();
+
+ boolean hasInterest(AMQMessage msg);
+
+ Queue<AMQMessage> getPreDeliveryQueue();
+
+ void enqueueForPreDelivery(AMQMessage msg);
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
index 0fd44e4fbc..f464384562 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.FieldTable;
/**
* Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This
@@ -32,6 +33,9 @@ import org.apache.qpid.AMQException;
*/
public interface SubscriptionFactory
{
+ Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters)
+ throws AMQException;
+
Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks)
throws AMQException;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index 5cad28b80d..79b0593f69 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -23,12 +23,18 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
+import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.BasicDeliverBody;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import java.util.Queue;
+
/**
* Encapsulation of a supscription to a queue.
* <p/>
@@ -48,23 +54,32 @@ public class SubscriptionImpl implements Subscription
private final Object sessionKey;
+ private Queue<AMQMessage> _messages;
+
+
/**
* True if messages need to be acknowledged
*/
private final boolean _acks;
+ private FilterManager _filters;
public static class Factory implements SubscriptionFactory
{
+ public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters) throws AMQException
+ {
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters);
+ }
+
public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks)
throws AMQException
{
- return new SubscriptionImpl(channel, protocolSession, consumerTag, acks);
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, null);
}
public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag)
throws AMQException
{
- return new SubscriptionImpl(channel, protocolSession, consumerTag);
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null);
}
}
@@ -72,6 +87,13 @@ public class SubscriptionImpl implements Subscription
String consumerTag, boolean acks)
throws AMQException
{
+ this(channelId, protocolSession, consumerTag, acks, null);
+ }
+
+ public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession,
+ String consumerTag, boolean acks, FieldTable filters)
+ throws AMQException
+ {
AMQChannel channel = protocolSession.getChannel(channelId);
if (channel == null)
{
@@ -83,6 +105,17 @@ public class SubscriptionImpl implements Subscription
this.consumerTag = consumerTag;
sessionKey = protocolSession.getKey();
_acks = acks;
+ _filters = FilterManagerFactory.createManager(filters);
+
+ if (_filters != null)
+ {
+ _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
+ }
+ else
+ {
+ // Reference the DeliveryManager
+ _messages = null;
+ }
}
public SubscriptionImpl(int channel, AMQProtocolSession protocolSession,
@@ -131,7 +164,7 @@ public class SubscriptionImpl implements Subscription
{
// if we do not need to wait for client acknowledgements
// we can decrement the reference count immediately.
-
+
// By doing this _before_ the send we ensure that it
// doesn't get sent if it can't be dequeued, preventing
// duplicate delivery on recovery.
@@ -178,6 +211,32 @@ public class SubscriptionImpl implements Subscription
channel.queueDeleted(queue);
}
+ public boolean hasFilters()
+ {
+ return _filters != null;
+ }
+
+ public boolean hasInterest(AMQMessage msg)
+ {
+ return _filters.allAllow(msg);
+ }
+
+ public Queue<AMQMessage> getPreDeliveryQueue()
+ {
+ return _messages;
+ }
+
+ public void enqueueForPreDelivery(AMQMessage msg)
+ {
+ if (_messages != null)
+ {
+ _messages.offer(msg);
+ }
+ }
+
+
+
+
private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
{
AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(), consumerTag,
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
index 353b461c8d..4df88baebc 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
@@ -20,12 +20,15 @@
*/
package org.apache.qpid.server.queue;
+import java.util.List;
+
/**
* Abstraction of actor that will determine the subscriber to whom
* a message will be sent.
*/
public interface SubscriptionManager
{
+ public List<Subscription> getSubscriptions();
public boolean hasActiveSubscribers();
public Subscription nextSubscriber(AMQMessage msg);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
index 7cc3f5f719..a4afe18e4d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
@@ -21,6 +21,8 @@
package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -58,6 +60,7 @@ class SubscriptionSet implements WeightedSubscriptionManager
/**
* Remove the subscription, returning it if it was found
+ *
* @param subscription
* @return null if no match was found
*/
@@ -90,7 +93,7 @@ class SubscriptionSet implements WeightedSubscriptionManager
/**
* Return the next unsuspended subscription or null if not found.
- *
+ * <p/>
* Performance note:
* This method can scan all items twice when looking for a subscription that is not
* suspended. The worst case occcurs when all subscriptions are suspended. However, it is does this
@@ -105,31 +108,58 @@ class SubscriptionSet implements WeightedSubscriptionManager
return null;
}
- try {
- final Subscription result = nextSubscriber();
- if (result == null) {
+ try
+ {
+ final Subscription result = nextSubscriberImpl(msg);
+ if (result == null)
+ {
_currentSubscriber = 0;
- return nextSubscriber();
- } else {
+ return nextSubscriberImpl(msg);
+ }
+ else
+ {
return result;
}
- } catch (IndexOutOfBoundsException e) {
+ }
+ catch (IndexOutOfBoundsException e)
+ {
_currentSubscriber = 0;
- return nextSubscriber();
+ return nextSubscriber(msg);
}
}
- private Subscription nextSubscriber()
+ private Subscription nextSubscriberImpl(AMQMessage msg)
{
final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
- while (iterator.hasNext()) {
+ while (iterator.hasNext())
+ {
Subscription subscription = iterator.next();
++_currentSubscriber;
subscriberScanned();
- if (!subscription.isSuspended()) {
- return subscription;
+
+ if (!subscription.isSuspended())
+ {
+ if (!subscription.hasFilters())
+ {
+ return subscription;
+ }
+ else
+ {
+ if (subscription.hasInterest(msg))
+ {
+ // if the queue is not empty then this client is ready to receive a message.
+ //FIXME the queue could be full of sent messages.
+ // Either need to clean all PDQs after sending a message
+ // OR have a clean up thread that runs the PDQs expunging the messages.
+ if (subscription.getPreDeliveryQueue().isEmpty())
+ {
+ return subscription;
+ }
+ }
+ }
}
}
+
return null;
}
@@ -145,11 +175,19 @@ class SubscriptionSet implements WeightedSubscriptionManager
return _subscriptions.isEmpty();
}
+ public List<Subscription> getSubscriptions()
+ {
+ return _subscriptions;
+ }
+
public boolean hasActiveSubscribers()
{
for (Subscription s : _subscriptions)
{
- if (!s.isSuspended()) return true;
+ if (!s.isSuspended())
+ {
+ return true;
+ }
}
return false;
}
@@ -159,7 +197,10 @@ class SubscriptionSet implements WeightedSubscriptionManager
int count = 0;
for (Subscription s : _subscriptions)
{
- if (!s.isSuspended()) count++;
+ if (!s.isSuspended())
+ {
+ count++;
+ }
}
return count;
}
@@ -177,7 +218,8 @@ class SubscriptionSet implements WeightedSubscriptionManager
}
}
- int size() {
+ int size()
+ {
return _subscriptions.size();
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
index ea64952bc7..49b0111b67 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
@@ -35,7 +35,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
class SynchronizedDeliveryManager implements DeliveryManager
{
- private static final Logger _log = Logger.getLogger(ConcurrentDeliveryManager.class);
+ private static final Logger _log = Logger.getLogger(SynchronizedDeliveryManager.class);
/**
* Holds any queued messages
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index c25eb1f2c3..4c57f28fef 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -23,11 +23,13 @@ package org.apache.qpid.client;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
+import org.apache.qpid.AMQInvalidSelectorException;
import org.apache.qpid.client.failover.FailoverSupport;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.protocol.AMQMethodEvent;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
@@ -49,6 +51,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
public class AMQSession extends Closeable implements Session, QueueSession, TopicSession
{
@@ -176,7 +179,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
if (message.deliverBody != null)
{
- final BasicMessageConsumer consumer = _consumers.get(message.deliverBody.consumerTag);
+ final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.deliverBody.consumerTag);
if (consumer == null)
{
@@ -210,17 +213,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
_connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage));
}
+ else if (errorCode == AMQConstant.NO_ROUTE.getCode())
+ {
+ _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
+ }
else
{
- if (errorCode == AMQConstant.NO_ROUTE.getCode())
- {
- _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
- }
- else
- {
- _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
- }
+ _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
}
+
}
catch (Exception e)
{
@@ -734,7 +735,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
-
public MessageListener getMessageListener() throws JMSException
{
checkNotClosed();
@@ -954,6 +954,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
registerConsumer(consumer, false);
}
+ catch (AMQInvalidSelectorException ise)
+ {
+ JMSException ex = new InvalidSelectorException(ise.getMessage());
+ ex.setLinkedException(ise);
+ throw ex;
+ }
catch (AMQException e)
{
JMSException ex = new JMSException("Error registering consumer: " + e);
@@ -963,7 +969,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
synchronized(destination)
{
- _destinationConsumerCount.putIfAbsent(destination,new AtomicInteger());
+ _destinationConsumerCount.putIfAbsent(destination, new AtomicInteger());
_destinationConsumerCount.get(destination).incrementAndGet();
}
@@ -975,16 +981,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private void checkTemporaryDestination(Destination destination)
throws JMSException
{
- if((destination instanceof TemporaryDestination))
+ if ((destination instanceof TemporaryDestination))
{
_logger.debug("destination is temporary");
final TemporaryDestination tempDest = (TemporaryDestination) destination;
- if(tempDest.getSession() != this)
+ if (tempDest.getSession() != this)
{
_logger.debug("destination is on different session");
throw new JMSException("Cannot consume from a temporary destination created onanother session");
}
- if(tempDest.isDeleted())
+ if (tempDest.isDeleted())
{
_logger.debug("destination is deleted");
throw new JMSException("Cannot consume from a deleted destination");
@@ -1065,12 +1071,19 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* @return the consumer tag generated by the broker
*/
private void consumeFromQueue(BasicMessageConsumer consumer, String queueName, AMQProtocolHandler protocolHandler,
- boolean nowait) throws AMQException
+ boolean nowait, String messageSelector) throws AMQException
{
//fixme prefetch values are not used here. Do we need to have them as parametsrs?
//need to generate a consumer tag on the client so we can exploit the nowait flag
String tag = Integer.toString(_nextTag++);
+ FieldTable arguments = FieldTableFactory.newFieldTable();
+ if (messageSelector != null)
+ {
+ //fixme move literal value to a common class.
+ arguments.put("x-filter-jms-selector", messageSelector);
+ }
+
consumer.setConsumerTag(tag);
// we must register the consumer in the map before we actually start listening
_consumers.put(tag, consumer);
@@ -1080,7 +1093,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0,
queueName, tag, consumer.isNoLocal(),
consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
- consumer.isExclusive(), nowait);
+ consumer.isExclusive(), nowait, arguments);
if (nowait)
{
protocolHandler.writeFrame(jmsConsume);
@@ -1220,7 +1233,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
checkNotClosed();
checkValidTopic(topic);
- AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic)topic, name, _connection);
+ AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
if (subscriber != null)
{
@@ -1247,8 +1260,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
- _subscriptions.put(name,subscriber);
- _reverseSubscriptionMap.put(subscriber.getMessageConsumer(),name);
+ _subscriptions.put(name, subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
return subscriber;
}
@@ -1278,8 +1291,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer);
- _subscriptions.put(name,subscriber);
- _reverseSubscriptionMap.put(subscriber.getMessageConsumer(),name);
+ _subscriptions.put(name, subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
return subscriber;
}
@@ -1476,7 +1489,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
- consumeFromQueue(consumer, queueName, protocolHandler, nowait);
+ try
+ {
+ consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector());
+ }
+ catch (JMSException e) //thrown by getMessageSelector
+ {
+ throw new AMQException(e.getMessage(), e);
+ }
}
/**
@@ -1489,7 +1509,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
_consumers.remove(consumer.getConsumerTag());
String subscriptionName = _reverseSubscriptionMap.remove(consumer);
- if(subscriptionName != null)
+ if (subscriptionName != null)
{
_subscriptions.remove(subscriptionName);
}
@@ -1497,7 +1517,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
Destination dest = consumer.getDestination();
synchronized(dest)
{
- if(_destinationConsumerCount.get(dest).decrementAndGet() == 0)
+ if (_destinationConsumerCount.get(dest).decrementAndGet() == 0)
{
_destinationConsumerCount.remove(dest);
}
@@ -1576,7 +1596,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
throw new javax.jms.InvalidDestinationException("Invalid Topic");
}
- if((topic instanceof TemporaryDestination) && ((TemporaryDestination)topic).getSession() != this)
+ if ((topic instanceof TemporaryDestination) && ((TemporaryDestination) topic).getSession() != this)
{
throw new JMSException("Cannot create a subscription on a temporary topic created in another session");
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
index 2bd93f1508..fd2968cdfd 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
@@ -23,6 +23,7 @@ package org.apache.qpid.client.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQChannelClosedException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
import org.apache.qpid.client.AMQNoConsumersException;
import org.apache.qpid.client.AMQNoRouteException;
import org.apache.qpid.protocol.AMQConstant;
@@ -46,7 +47,7 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener
public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
{
- _logger.debug("ChannelClose method received");
+ _logger.debug("ChannelClose method received");
ChannelCloseBody method = (ChannelCloseBody) evt.getMethod();
int errorCode = method.replyCode;
@@ -65,17 +66,21 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener
{
throw new AMQNoConsumersException("Error: " + reason, null);
}
+ else if (errorCode == AMQConstant.NO_ROUTE.getCode())
+ {
+ throw new AMQNoRouteException("Error: " + reason, null);
+ }
+ else if (errorCode == AMQConstant.INVALID_SELECTOR.getCode())
+ {
+ _logger.info("Broker responded with Invalid Selector.");
+
+ throw new AMQInvalidSelectorException(reason);
+ }
else
{
- if (errorCode == AMQConstant.NO_ROUTE.getCode())
- {
- throw new AMQNoRouteException("Error: " + reason, null);
- }
- else
- {
- throw new AMQChannelClosedException(errorCode, "Error: " + reason);
- }
+ throw new AMQChannelClosedException(errorCode, "Error: " + reason);
}
+
}
evt.getProtocolSession().channelClosed(evt.getChannelId(), errorCode, reason);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index fea7a29594..572739d0b1 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -26,7 +26,8 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpid.client.*;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.BasicMessageConsumer;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
index 492571b6af..21ae3fc71f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
@@ -110,7 +110,7 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
}
else
{
- throw new AMQException("Woken up due to exception", _error); // FIXME: This will wrap FailoverException and prevent it being caught.
+ throw new AMQException("Woken up due to " + _error.getClass(), _error); // FIXME: This will wrap FailoverException and prevent it being caught.
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
index 0de2850080..d6364f45b0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
@@ -59,7 +59,7 @@ public class SocketTransportConnection implements ITransportConnection
// once more testing of the performance of the simple allocator has been done
if (!Boolean.getBoolean("amqj.enablePooledAllocator"))
{
- _logger.warn("Using SimpleByteBufferAllocator");
+ _logger.info("Using SimpleByteBufferAllocator");
ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
}
diff --git a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
index 53e7fd066e..5497cafed4 100644
--- a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
@@ -40,11 +40,15 @@ import javax.naming.spi.InitialContextFactory;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map;
+import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
public class PropertiesFileInitialContextFactory implements InitialContextFactory
{
- protected final Logger _logger = Logger.getLogger(getClass());
+ protected final Logger _logger = Logger.getLogger(PropertiesFileInitialContextFactory.class);
private String CONNECTION_FACTORY_PREFIX = "connectionfactory.";
private String DESTINATION_PREFIX = "destination.";
@@ -55,6 +59,41 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor
{
Map data = new ConcurrentHashMap();
+ try
+ {
+
+ String file = null;
+ if (environment.contains(Context.PROVIDER_URL))
+ {
+ file = (String) environment.get(Context.PROVIDER_URL);
+ }
+ else
+ {
+ file = System.getProperty(Context.PROVIDER_URL);
+ }
+
+ if (file != null)
+ {
+ _logger.info("Loading Properties from:" + file);
+ //Load the properties specified
+ Properties p = new Properties();
+
+ p.load(new BufferedInputStream(new FileInputStream(file)));
+
+ environment.putAll(p);
+ _logger.info("Loaded Context Properties:" + environment.toString());
+ }
+ else
+ {
+ _logger.warn("No Provider URL specified.");
+ }
+ }
+ catch (IOException ioe)
+ {
+ _logger.warn("Unable to load property file specified in Provider_URL:" +
+ environment.get(Context.PROVIDER_URL));
+ }
+
createConnectionFactories(data, environment);
createDestinations(data, environment);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
new file mode 100644
index 0000000000..27a2ccb32e
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
@@ -0,0 +1,141 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.basic;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.BasicMessageProducer;
+import org.apache.qpid.client.transport.TransportConnection;
+
+import org.apache.log4j.Logger;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.DeliveryMode;
+
+import junit.framework.TestCase;
+
+public class SelectorTest extends TestCase implements MessageListener
+{
+
+ private final static Logger _logger = org.apache.log4j.Logger.getLogger(SelectorTest.class);
+
+ private AMQConnection _connection;
+ private AMQDestination _destination;
+ private AMQSession _session;
+ private int count;
+ public String _connectionString = "vm://:1";
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+ init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path"));
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ TransportConnection.killAllVMBrokers();
+ }
+
+ private void init(AMQConnection connection) throws Exception
+ {
+ init(connection, new AMQQueue(randomize("SessionStartTest"), true));
+ }
+
+ private void init(AMQConnection connection, AMQDestination destination) throws Exception
+ {
+ _connection = connection;
+ _destination = destination;
+ connection.start();
+
+
+ String selector = null;
+// selector = "Cost = 2 AND JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT;
+// selector = "JMSType = Special AND Cost = 2 AND AMQMessageID > 0 AND JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT;
+
+ _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ //_session.createConsumer(destination).setMessageListener(this);
+ _session.createConsumer(destination, selector).setMessageListener(this);
+ }
+
+ public synchronized void test() throws JMSException, InterruptedException
+ {
+ try
+ {
+ Message msg = _session.createTextMessage("Message");
+ msg.setJMSPriority(1);
+ msg.setIntProperty("Cost", 2);
+ msg.setJMSType("Special");
+
+ _logger.info("Sending Message:" + msg);
+
+ ((BasicMessageProducer) _session.createProducer(_destination)).send(msg, DeliveryMode.NON_PERSISTENT);
+ System.out.println("Message sent, waiting for response...");
+ wait(1000);
+
+ if (count > 0)
+ {
+ _logger.info("Got message");
+ }
+
+ if (count == 0)
+ {
+ fail("Did not get message!");
+ //throw new RuntimeException("Did not get message!");
+ }
+ }
+ finally
+ {
+ _session.close();
+ _connection.close();
+ }
+ }
+
+ public synchronized void onMessage(Message message)
+ {
+ count++;
+ _logger.info("Got Message:" + message);
+ notify();
+ }
+
+ private static String randomize(String in)
+ {
+ return in + System.currentTimeMillis();
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ SelectorTest test = new SelectorTest();
+ test._connectionString = argv.length == 0 ? "localhost:5672" : argv[0];
+ test.setUp();
+ test.test();
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(SelectorTest.class);
+ }
+}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
index fa20e9ab76..39ae7e3c3e 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.qpid.server.cluster.util.LogMessage;
+import java.util.List;
+
class ClusteredSubscriptionManager extends SubscriptionSet
{
private static final Logger _logger = Logger.getLogger(ClusteredSubscriptionManager.class);
@@ -82,6 +84,11 @@ class ClusteredSubscriptionManager extends SubscriptionSet
return ClusteredSubscriptionManager.this.getWeight();
}
+ public List<Subscription> getSubscriptions()
+ {
+ return ClusteredSubscriptionManager.super.getSubscriptions();
+ }
+
public boolean hasActiveSubscribers()
{
return ClusteredSubscriptionManager.super.hasActiveSubscribers();
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java
index d01ebb5ba2..0566c5203b 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java
@@ -21,12 +21,12 @@
package org.apache.qpid.server.queue;
import java.util.List;
+import java.util.LinkedList;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* Distributes messages among a list of subsscription managers, using their
* weighting.
- *
*/
class NestedSubscriptionManager implements SubscriptionManager
{
@@ -44,11 +44,24 @@ class NestedSubscriptionManager implements SubscriptionManager
_subscribers.remove(s);
}
+
+ public List<Subscription> getSubscriptions()
+ {
+ List<Subscription> allSubs = new LinkedList<Subscription>();
+
+ for (WeightedSubscriptionManager subMans : _subscribers)
+ {
+ allSubs.addAll(subMans.getSubscriptions());
+ }
+
+ return allSubs;
+ }
+
public boolean hasActiveSubscribers()
{
- for(WeightedSubscriptionManager s : _subscribers)
+ for (WeightedSubscriptionManager s : _subscribers)
{
- if(s.hasActiveSubscribers())
+ if (s.hasActiveSubscribers())
{
return true;
}
@@ -59,9 +72,9 @@ class NestedSubscriptionManager implements SubscriptionManager
public Subscription nextSubscriber(AMQMessage msg)
{
WeightedSubscriptionManager start = current();
- for(WeightedSubscriptionManager s = start; s != null; s = next(start))
+ for (WeightedSubscriptionManager s = start; s != null; s = next(start))
{
- if(hasMore(s))
+ if (hasMore(s))
{
return nextSubscriber(s);
}
@@ -94,7 +107,7 @@ class NestedSubscriptionManager implements SubscriptionManager
private WeightedSubscriptionManager next()
{
_iterations = 0;
- if(++_index >= _subscribers.size())
+ if (++_index >= _subscribers.size())
{
_index = 0;
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
index 0268ff2171..cc7f6ecd2a 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
@@ -25,6 +25,9 @@ import org.apache.qpid.server.cluster.GroupManager;
import org.apache.qpid.server.cluster.SimpleSendable;
import org.apache.qpid.AMQException;
+import java.util.Queue;
+import java.util.List;
+
class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManager
{
private final GroupManager _groupMgr;
@@ -76,6 +79,11 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage
return _count;
}
+ public List<Subscription> getSubscriptions()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public boolean hasActiveSubscribers()
{
return getWeight() == 0;
@@ -88,9 +96,34 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage
public void queueDeleted(AMQQueue queue)
{
- if(queue instanceof ClusteredQueue)
+ if (queue instanceof ClusteredQueue)
{
((ClusteredQueue) queue).removeAllRemoteSubscriber(_peer);
}
}
+
+ public boolean hasFilters()
+ {
+ return false;
+ }
+
+ public boolean hasInterest(AMQMessage msg)
+ {
+ return true;
+ }
+
+ public Queue<AMQMessage> getPreDeliveryQueue()
+ {
+ return null;
+ }
+
+ public void enqueueForPreDelivery(AMQMessage msg)
+ {
+ //no-op -- if selectors are implemented on RemoteSubscriptions then look at SubscriptionImpl
+ }
+
+ public void sendNextMessage(AMQQueue queue)
+ {
+
+ }
}
diff --git a/java/common/src/main/java/log4j.properties b/java/common/src/main/java/log4j.properties
new file mode 100644
index 0000000000..6d596d1d19
--- /dev/null
+++ b/java/common/src/main/java/log4j.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+log4j.rootLogger=${root.logging.level}
+
+
+log4j.logger.org.apache.qpid=${amqj.logging.level}, console
+log4j.additivity.org.apache.qpid=false
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.Threshold=all
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n
diff --git a/java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java b/java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java
new file mode 100644
index 0000000000..dcd039b789
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/AMQInvalidSelectorException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid;
+
+import org.apache.qpid.protocol.AMQConstant;
+
+public class AMQInvalidSelectorException extends AMQException
+{
+ public AMQInvalidSelectorException(String message)
+ {
+ super(AMQConstant.INVALID_SELECTOR.getCode(),message);
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java
index 1292ff2f6e..83cd204ca9 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java
@@ -483,7 +483,7 @@ public class PropertyFieldTable implements FieldTable
{
return _properties.containsKey(name) && (_properties.get(name) == null) &&
_propertyNamesTypeMap.get(name).equals(Prefix.AMQP_NULL_STRING_PROPERTY_PREFIX);
-
+
}
@@ -606,7 +606,8 @@ public class PropertyFieldTable implements FieldTable
// AMQ start character
if (!(Character.isLetter(propertyName.charAt(0))
|| propertyName.charAt(0) == '$'
- || propertyName.charAt(0) == '#'))
+ || propertyName.charAt(0) == '#'
+ || propertyName.charAt(0) == '_')) // Not official AMQP added for JMS.
{
throw new IllegalArgumentException("Identifier '" + propertyName + "' does not start with a valid AMQP start character");
}
@@ -1156,9 +1157,9 @@ public class PropertyFieldTable implements FieldTable
if (type == null)
{
String msg = "Field '" + key + "' - unsupported field table type: " + type + ".";
- //some extra trace information...
- msg += " (" + iType + "), length=" + length + ", sizeRead=" + sizeRead + ", sizeRemaining=" + sizeRemaining;
- throw new AMQFrameDecodingException(msg);
+ //some extra trace information...
+ msg += " (" + iType + "), length=" + length + ", sizeRead=" + sizeRead + ", sizeRemaining=" + sizeRemaining;
+ throw new AMQFrameDecodingException(msg);
}
Object value;
@@ -1203,7 +1204,7 @@ public class PropertyFieldTable implements FieldTable
value = EncodingUtils.readBytes(buffer);
break;
default:
- String msg = "Internal error, the following type identifier is not handled: " + type;
+ String msg = "Internal error, the following type identifier is not handled: " + type;
throw new AMQFrameDecodingException(msg);
}
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
index fc83c0726d..a0d243ca30 100644
--- a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
+++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
@@ -55,7 +55,7 @@ public final class AMQConstant
{
return _name;
}
-
+
public static final AMQConstant FRAME_MIN_SIZE = new AMQConstant(4096, "frame min size", true);
public static final AMQConstant FRAME_END = new AMQConstant(206, "frame end", true);
@@ -74,6 +74,8 @@ public final class AMQConstant
public static final AMQConstant CONTEXT_UNKNOWN = new AMQConstant(321, "context unknown", true);
+ public static final AMQConstant INVALID_SELECTOR = new AMQConstant(322, "selector invalid", true);
+
public static final AMQConstant INVALID_PATH = new AMQConstant(402, "invalid path", true);
public static final AMQConstant ACCESS_REFUSED = new AMQConstant(403, "access refused", true);
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index c455d771cb..8de3c8bf33 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -30,7 +30,7 @@ import javax.management.JMException;
/**
* Test class to test AMQQueueMBean attribtues and operations
*/
-public class AMQQueueMBeanTest extends TestCase
+public class AMQQueueMBeanTest extends TestCase
{
private AMQQueue _queue;
private AMQQueueMBean _queueMBean;
@@ -62,14 +62,14 @@ public class AMQQueueMBeanTest extends TestCase
assertFalse(mgr.hasActiveSubscribers());
assertTrue(_queueMBean.getActiveConsumerCount() == 0);
- _channel = new AMQChannel(1, _messageStore, null);
+ _channel = new AMQChannel(1, _messageStore, null);
_protocolSession = new MockProtocolSession(_messageStore);
_protocolSession.addChannel(_channel);
- _queue.registerProtocolSession(_protocolSession, 1, "test", false);
+ _queue.registerProtocolSession(_protocolSession, 1, "test", false, null);
assertTrue(_queueMBean.getActiveConsumerCount() == 1);
- SubscriptionSet _subscribers = (SubscriptionSet)mgr;
+ SubscriptionSet _subscribers = (SubscriptionSet) mgr;
SubscriptionTestHelper s1 = new SubscriptionTestHelper("S1");
SubscriptionTestHelper s2 = new SubscriptionTestHelper("S2");
_subscribers.addSubscriber(s1);
@@ -161,7 +161,7 @@ public class AMQQueueMBeanTest extends TestCase
super.setUp();
_queueRegistry = new DefaultQueueRegistry();
_queue = new AMQQueue("testQueue", false, "AMQueueMBeanTest", false, _queueRegistry);
- _queueMBean = new AMQQueueMBean(_queue);
+ _queueMBean = new AMQQueueMBean(_queue);
}
private void sendMessages(int messageCount) throws AMQException
@@ -169,7 +169,8 @@ public class AMQQueueMBeanTest extends TestCase
AMQMessage[] messages = new AMQMessage[messageCount];
for (int i = 0; i < messages.length; i++)
{
- messages[i] = message(false);;
+ messages[i] = message(false);
+ ;
}
for (int i = 0; i < messageCount; i++)
{
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
index 2773c810d2..2de22f9084 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.queue;
import java.util.ArrayList;
import java.util.List;
+import java.util.Queue;
public class SubscriptionTestHelper implements Subscription
{
@@ -70,6 +71,26 @@ public class SubscriptionTestHelper implements Subscription
{
}
+ public boolean hasFilters()
+ {
+ return false;
+ }
+
+ public boolean hasInterest(AMQMessage msg)
+ {
+ return true;
+ }
+
+ public Queue<AMQMessage> getPreDeliveryQueue()
+ {
+ return null;
+ }
+
+ public void enqueueForPreDelivery(AMQMessage msg)
+ {
+ //no-op
+ }
+
public int hashCode()
{
return key.hashCode();
diff --git a/specs/amqp-8.0.xml b/specs/amqp-8.0.xml
index a9a9b21ecc..b84751c398 100644
--- a/specs/amqp-8.0.xml
+++ b/specs/amqp-8.0.xml
@@ -2105,6 +2105,14 @@ localised reply text
method it will raise a channel or connection exception.
</doc>
</field>
+
+ <field name="arguments" type="table" label="arguments for consuming">
+ <doc>
+ A set of arguments for the consume. The syntax and semantics
+ of these arguments depends on the server implementation. This
+ field is ignored if passive is 1.
+ </doc>
+ </field>
</method>
<method name = "consume-ok" synchronous = "1" index = "21">